分布式消息队列-Celery

怎么能不恨呢,在我发现自己是恶鬼的时候,在我最绝望最虚弱的时候,这个世上最该跟我在一起的人却用刀把我的心刺穿了

Celery 是 Distributed Task Queue,分布式任务队列。分布式决定了可以有多个 worker 的存在,队列表示其是异步操作。它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

安装

Celery4.x 开始不再支持Windows平台了。下面装的是3.1.25。

安装使用命令

pip3 install celery==3.1.25

查看是否安装成功,使用命令即可:

celery --version

如果在win10下想要使用celery4.x的话,可以这么做:

pip3 install eventlet

运行worker的时候加上一个参数:

celery -A xxxx worker -l info -P eventlet

然后安装redis(个人比较喜欢redis)

首先下载安装redis windowns服务端

下载地址

解压后,在其目录下执行命令

redis-server.exe 

即可启动redis数据库

然后安装python连接操作redis的库

pip3 install redis==2.10.6

注意版本号

核心主件

celery通过五大模块实现

Task

就是任务,有异步任务和定时任务

Broker

中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。

Worker

执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

Beat

定时任务调度器,根据配置定时将任务发送给Broler。

Backend

用于存储任务的执行结果。

组成关系

各个角色间的关系看下面这张图理解一下:

初次使用

首先编写一个文件 命名为task1.py

from celery import Celery
app = Celery('tasks',broker='redis://192.168.1.102:6379/0')
# redis://192.168.1.102:6379/0 是redis数据库地址,无需账号密码验证,也是ssrf在获取内网系统权限的方式之一
@app.task
def add(x,y):
    print('传递 {} + {} = {}'.format(x,y,x+y))
    return x+y

然后启动redis数据库

接下来再task1文件夹执行命令

celery -A task1 worker --loglevel=info 

就会看到消息队列都启动

到现在所有的队列都启动,可以向这个队列添加任务等待处理

方法是再task1目录下打开cmd窗口,进入python3交互界面

python3
from task1 import add
add.delay(6,12)
add.delay(6,6)

上面只是一个发送任务的调用,结果是拿不到的。上面也没有接收返回值,这次把返回值保存到起来

修改task1内容

app = Celery('tasks',broker='redis://192.168.1.102:6379/0',backend='redis://192.168.1.102:6379/0')

然后要重启Worker,IDLE也要重启

然后这样就能获取结果了

t = add.delay(1, 1)
t.get()
# 还可以设置超时时间 t.get(timeout=5)

# 如果出错,获取错误结果,不触发异常
# 使用命令t.get(propagate=False)    
# t.traceback  (打印异常详细结果)

# 还可以获取任务状态
# t.ready() 返回True 或者False

在项目中使用Celery

可以把celery配置成一个应用,假设应用名字是CeleryPro,目录格式如下:

CeleryPro
├─__init.py
├─celery.py
├─tasks.py

这里的连接文件命名必须为celery.py,其他名字随意

celery文件

这个文件名必须是celery.py:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='redis://192.168.1.102:6379',
             backend='redis://192.168.1.102:6379',
             include=['CeleryPro.tasks'])
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks文件

这个文件开始两行就多了一个点,这里要导入上面的celery.py文件。后面只要写各种任务加上装饰器就可以了:

from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("计算2个值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

启动worker

启动的时候,-A 参数后面用应用名称 CeleryPro 。你还需要cd到你CeleryPro的父级目录上启动,否则找不到:

启动的姿势

这里注意用的都是CeleryPro:

celery -A CeleryPro worker -loglevel=info  # 前台启动不推荐
celery -A CeleryPro worker -l info  # 前台启动简写
celery multi start w1 -A  CeleryPro -l info  # 推荐用后台启动

定时任务

主要修改 celery.py文件

from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

app = Celery('CeleryPro',
             broker='redis://192.168.1.102',
             backend='redis://192.168.1.102',
             include=['CeleryPro.tasks'])

app.conf.CELERYBEAT_SCHEDULE = {
    'add every 10 seconds': {
        'task': 'CeleryPro.tasks.add',
        'schedule': timedelta(seconds=10),  
        # 可以用timedelta对象
        # 'schedule': 10,  # 也支持直接用数字表示秒数
        'args': (1, 2)
    },
    'upper every 2 minutes': {
        'task': 'CeleryPro.tasks.upper',
        'schedule': crontab(minute='*/2'),
        'args': ('abc', ),
    },
}

# app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

启动使用命令

celery -A CeleryPro beat -l info
celery -A CeleryPro worker -l info

参数解析:

-l info     与--loglevel=info的作用是一样的。
--beat    周期性的运行。即设置 心跳。

新例子

# tasks.py
# coding:utf-8
from celery import Celery,platforms

app = Celery('tasks')
app.config_from_object('config')
platforms.C_FORCE_ROOT = True

@app.task
def add(x,y):
    return x + y

和另一个文件

# config.py
# coding:utf-8
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta

BROKER_URL = 'redis://127.0.0.1:6379/0'

CELERYBEAT_SCHEDULE = {
    'add-every-2-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=2),
        'args': (16, 10),
    },
}

CELERY_TIMEZONE = 'Asia/Shanghai'

然后打开三个cmd窗口,依次输入:

celery -A tasks beat -l info
celery -A tasks worker -l info
celery -A tasks flower

然后访问本地5555端口即可~

查看异步任务情况

Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现

下实现的方式如下:

安装flower:

pip3 install flower

启动flower(默认会启动一个webserver,端口为5555):

在另一个Terminal中:

celery -A task1 flower

这里的task1是上面创建的py文件

进入

http://localhost:5555

即可查看。

资料文档

集群部署

简书

更多资料

查看进程信息

演示实例

坚持原创技术分享,您的支持将鼓励我继续创作!

-------------本文结束感谢您的阅读-------------

腾讯云主机优惠打折:最新活动地址


版权声明

LangZi_Blog's by Jy Xie is licensed under a Creative Commons BY-NC-ND 4.0 International License
由浪子LangZi创作并维护的Langzi_Blog's博客采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于Langzi_Blog's 博客( http://langzi.fun ),版权所有,侵权必究。

0%