Celery 是一个基于消息传递的分布式任务队列。与单纯的消息队列相比较,队列中传递的消息为任务(Task/Job)信息,而不是数据类信息,Celery基于这些消息完成了任务信息的管理。实现这种任务管理的相似项目还有Gearman ,只不过这个项目已经很久未更新了,更推荐使用Celery。
Celery支持同步和异步执行两种模式。同步模式为任务调用方等待任务执行完成,这种方式等同于RPC(Remote Procedure Call), 异步方式为任务在后台执行,调用方调用后就去做其他工作,之后再根据需要来查看任务结果。Celery自己没有实现消息队列,而是直接已存在的消息队列作为Broker角色。官方推荐的Broker为RabbitMQ ,除此之外,Redis、Beanstalkd、MongoDB等也都支持,具体可参考官方文档 。
Celery整体架构可以理解为下图:
整体上包括三个角色:
Celery client: 这是任务生产者,它负责将任务发送到Broker中。
Broker: Broker负责将任务分发给相应的celery worker。
Celery worker: 这是任务的执行者,完成相应的业务逻辑,在具体实现上体现为Python函数。
下面我们通过实例来说明Celery用法。
首先,准备好Broker环境, 这里使用RabbitMQ。RabbitMQ安装及启动完成后, 首先创建必要的vhost, user并设置相应权限:
1 2 3 4 rabbitmqctl add_vhost demo1 rabbitmqctl add_user demo demo rabbitmqctl set_user_tags demo demotag rabbitmqctl rabbitmqctl set_permissions -p demo1 demo ".*" ".*" ".*"
创建celery worker部分代码文件:demo1.py
,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from celery import Celeryimport timeapp = Celery('demo1' , broker='amqp://demo:demo@localhost:5672/demo1' , backend='rpc://' ) @app.task def add (x, y ): time.sleep(2 ) return x + y @app.task def sub (x, y ): time.sleep(2 ) return x - y
demo1
中实现了两个task,add
和sub
两个函数。@app.task
修饰符告诉Celery这个函数并不在celery client端执行,当它们被调用时只是将调用信息通过Brocker发送给celery workers执行。backend
参数表示celery workers执行完的结果需要保存,rpc
表示结果将通过RPC(Remote Procedure Call)
模式被送到RabbitMQ。如果不指定backend
参数,任务结果将被丢弃。
接着,准备celery client端代码。创建client.py
文件,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from celery import Celeryimport demo1import timeapp1 = Celery('demo1' , broker='amqp://demo:demo@localhost:5672/demo1' , backend='rpc://' ) x = demo1.add.delay(1 , 2 ) print "Ready? " , x.ready()print "Result: " , x.resulttime.sleep(3 ) print "Ready? " , x.ready()print "Result: " , x.result
接下来,启动celery worker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 [root@centos1 celery]# celery -A demo1 worker --loglevel=info /usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@centos1 v4.2.0 (windowlicker) ---- **** ----- --- * *** * -- Linux-3.10.0-229.14.1.el7.x86_64-x86_64-with-centos-7.1.1503-Core 2018-08-14 06:54:40 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: demo1:0x2af55d0 - ** ---------- .> transport: amqp://demo:**@localhost:5672/demo1 - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . demo1.add . demo1.sub [2018-08-14 06:54:40,230: INFO/MainProcess] Connected to amqp://demo:**@127.0.0.1:5672/demo1 [2018-08-14 06:54:40,249: INFO/MainProcess] mingle: searching for neighbors [2018-08-14 06:54:41,282: INFO/MainProcess] mingle: all alone [2018-08-14 06:54:41,298: INFO/MainProcess] celery@centos1 ready.
执行celery client:
1 2 3 4 5 [root@centos1 celery]# python client.py Ready? False Result: None Ready? True Result: 3
从结果可以看到,第一次调用x.ready()
查看任务是否完成时返回值为False
, 任务结果x.result
还没有生成。延迟3秒之后,再次调用,此时任务已经执行完成,通过x.result
获取到了任务结果。
观察worker的日志输出,可以看到任务执行成功:
1 2 [2018-08-14 06:56:06,695: INFO/MainProcess] Received task: demo1.add[ceef3d4f-c546-4853-bbf6-3e4c050a9957] [2018-08-14 06:56:08,734: INFO/ForkPoolWorker-2] Task demo1.add[ceef3d4f-c546-4853-bbf6-3e4c050a9957] succeeded in 2.036982472s: 3
在上述例子中,尽管celery client并不需要执行任务本身实现的相关代码,但还是需要import
任务实现模块。这在一些场景下并不合适。Celery还提供了直接通过AMQP
协议的方式来进行任务相关操作,比如,可以使用app.send_task
函数来触发任务执行, 具体参考文档
我们将client.py
修改为如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from celery import Celeryimport timeapp1 = Celery('demo1' , broker='amqp://demo:demo@localhost:5672/demo1' , backend='rpc://' ) x = app1.send_task('demo1.add' , args=[1 , 2 ], kwargs={}) print "Ready? " , x.ready()print "Result: " , x.resulttime.sleep(3 ) print "Ready? " , x.ready()print "Result: " , x.result
再次执行client, 结果与之前一致:
1 2 3 4 5 [root@centos1 celery]# python client.py Ready? False Result: None Ready? True Result: 3
除了上述的异步任务,Celery还支持周期性任务(Periodic Tasks)。这种任务依赖另一个celery beat
进程根据配置周期性地将任务发往broker。
我们设置每10秒钟执行一次demo1.add
任务, 这需要在demo1.py
中添加如下代码:
1 2 3 4 5 6 7 app.conf.beat_schedule = { ‘add_every_10_seconds': { ' task': ' demo1.add', ' schedule': 10.0, ' args': (1, 2), }, }
重新启动celery worker, 并启动celery beat:
1 2 3 4 5 6 7 8 9 10 11 [root@centos1 celery]# celery -A demo1 beat celery beat v4.2.0 (windowlicker) is starting. __ - ... __ - _ LocalTime -> 2018-08-14 08:02:05 Configuration -> . broker -> amqp://demo:**@localhost:5672/demo1 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%WARNING . maxinterval -> 5.00 minutes (300s)
我们可以从worker日志中看到周期性的任务执行:
1 2 3 4 5 6 7 8 [2018-08-14 08:12:05,666: INFO/MainProcess] Received task: demo1.add[4d08d394-2d19-48ce-bb4b-d39c58f42f47] [2018-08-14 08:12:07,669: INFO/ForkPoolWorker-2] Task demo1.add[4d08d394-2d19-48ce-bb4b-d39c58f42f47] succeeded in 2.002474438s: 3 [2018-08-14 08:12:35,686: INFO/MainProcess] Received task: demo1.add[55837f1d-b7dc-4c08-965b-215d92132394] [2018-08-14 08:12:37,692: INFO/ForkPoolWorker-2] Task demo1.add[55837f1d-b7dc-4c08-965b-215d92132394] succeeded in 2.003697262s: 3 [2018-08-14 08:13:05,704: INFO/MainProcess] Received task: demo1.add[8d8591de-52cd-4271-b079-5f94b7f41af1] [2018-08-14 08:13:07,709: INFO/ForkPoolWorker-2] Task demo1.add[8d8591de-52cd-4271-b079-5f94b7f41af1] succeeded in 2.003211596s: 3 [2018-08-14 08:13:35,725: INFO/MainProcess] Received task: demo1.add[1f546d46-fd5d-4572-9712-5848459a4cea] [2018-08-14 08:13:37,730: INFO/ForkPoolWorker-2] Task demo1.add[1f546d46-fd5d-4572-9712-5848459a4cea] succeeded in 2.003228644s: 3
除了上边这种简单的执行频率设置,Celery还支持以crontab
的形式来设定执行时间,这种方式需要注意时区的设置, 具体的参数参考文档
下面我们以crontab有形式设置每分钟执行一次demo1.add
, 将demo1.py
修改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from celery import Celeryfrom celery.schedules import crontabimport timeapp = Celery('demo1' , broker='amqp://demo:demo@localhost:5672/demo1' , backend='rpc://' ) app.conf.beat_schedule = { 'every_minute' : { 'task' : 'demo1.add' , 'schedule' : crontab(minute='*/1' ), 'args' : (1 , 2 ), }, } @app.task def add (x, y ): time.sleep(2 ) return x + y @app.task def sub (x, y ): time.sleep(2 ) return x - y
重启worker和beat, 观察worker的执行日志, 可以看到约每分钟执行了一次任务:
1 2 3 4 5 6 [2018-08-14 08:42:18,011: INFO/MainProcess] Received task: demo1.add[b532debb-49b4-4b9a-ac83-a3dad6ebe68c] [2018-08-14 08:42:20,032: INFO/ForkPoolWorker-2] Task demo1.add[b532debb-49b4-4b9a-ac83-a3dad6ebe68c] succeeded in 2.019885588s: 3 [2018-08-14 08:43:00,039: INFO/MainProcess] Received task: demo1.add[9fa61f49-5bcb-47dc-9d18-7c921fd5ed18] [2018-08-14 08:43:02,044: INFO/ForkPoolWorker-2] Task demo1.add[9fa61f49-5bcb-47dc-9d18-7c921fd5ed18] succeeded in 2.003939354s: 3 [2018-08-14 08:44:00,056: INFO/MainProcess] Received task: demo1.add[cd1f0f5b-9bea-4764-ad42-9a62102d419f] [2018-08-14 08:44:02,062: INFO/ForkPoolWorker-2] Task demo1.add[cd1f0f5b-9bea-4764-ad42-9a62102d419f] succeeded in 2.00425067799s: 3
从beat的debug级别的日志中也可以看调度信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 [root@centos1 celery]# celery -A demo1 beat --loglevel=debug celery beat v4.2.0 (windowlicker) is starting. __ - ... __ - _ LocalTime -> 2018-08-14 08:42:46 Configuration -> . broker -> amqp://demo:**@localhost:5672/demo1 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%DEBUG . maxinterval -> 5.00 minutes (300s) [2018-08-14 08:42:46,250: DEBUG/MainProcess] Setting default socket timeout to 30 [2018-08-14 08:42:46,251: INFO/MainProcess] beat: Starting... [2018-08-14 08:42:46,259: DEBUG/MainProcess] Current schedule: <ScheduleEntry: every_minute demo1.add(1, 2) <crontab: */1 * * * * (m/h/d/dM/MY)> [2018-08-14 08:42:46,259: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes [2018-08-14 08:42:46,259: DEBUG/MainProcess] beat: Waking up in 13.73 seconds. [2018-08-14 08:43:00,003: DEBUG/MainProcess] beat: Synchronizing schedule... [2018-08-14 08:43:00,024: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2018 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@centos1', 'platform': 'Erlang/OTP 21.0.3', 'version': '3.7.7'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US'] [2018-08-14 08:43:00,027: INFO/MainProcess] Scheduler: Sending due task every_minute (demo1.add) [2018-08-14 08:43:00,030: DEBUG/MainProcess] using channel_id: 1 [2018-08-14 08:43:00,031: DEBUG/MainProcess] Channel open [2018-08-14 08:43:00,037: DEBUG/MainProcess] demo1.add sent. id->9fa61f49-5bcb-47dc-9d18-7c921fd5ed18 [2018-08-14 08:43:00,037: DEBUG/MainProcess] beat: Waking up in 59.95 seconds. [2018-08-14 08:44:00,051: INFO/MainProcess] Scheduler: Sending due task every_minute (demo1.add) [2018-08-14 08:44:00,054: DEBUG/MainProcess] demo1.add sent. id->cd1f0f5b-9bea-4764-ad42-9a62102d419f [2018-08-14 08:44:00,054: DEBUG/MainProcess] beat: Waking up in 59.94 seconds.
在实际应用场景中,我们往往需要关注任务的执行状态及信息统计。Celery提供了一些命令可以来监控worker和tasks的状态。
status
命令可以查看worker列表:
1 2 3 4 [root@centos1 celery]# celery -A demo1 status celery@centos1: OK 1 node online.
inspect active
命令可以查看worker正在执行的任务,没有任务时显示:
1 2 3 [root@centos1 celery]# celery -A demo1 inspect active -> celery@centos1: OK - empty -
当worker执行任务时,显示出了正在执行的任务信息:
1 2 3 [root@centos1 celery]# celery -A demo1 inspect active -> celery@centos1: OK * {u'args': u'[1, 2]', u'time_start': 1534227677.0447896, u'name': u'demo1.add', u'delivery_info': {u'priority': 0, u'redelivered': False, u'routing_key': u'celery', u'exchange': u''}, u'hostname': u'celery@centos1', u'acknowledged': True, u'kwargs': u'{}', u'type': u'demo1.add', u'id': u'92fd8829-b1a8-4239-95ac-aeabca724bc5', u'worker_pid': 25917}
inspect status
命令可以获取worker的统计信息,它输出的信息非常丰富,具体可以参考文档
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 [root@centos1 celery]# celery -A demo1 inspect stats -> celery@centos1: OK { "broker": { "alternates": [], "connect_timeout": 4, "failover_strategy": "round-robin", "heartbeat": 120.0, "hostname": "127.0.0.1", "insist": false, "login_method": "AMQPLAIN", "port": 5672, "ssl": false, "transport": "amqp", "transport_options": {}, "uri_prefix": null, "userid": "demo", "virtual_host": "demo1" }, "clock": "343", "pid": 25909, "pool": { "max-concurrency": 2, "max-tasks-per-child": "N/A", "processes": [ 25916, 25917 ], "put-guarded-by-semaphore": false, "timeouts": [ 0, 0 ], "writes": { "all": "100.00%", "avg": "100.00%", "inqueues": { "active": 0, "total": 2 }, "raw": "1", "strategy": "fair", "total": 1 } }, "prefetch_count": 8, "rusage": { "idrss": 0, "inblock": 88, "isrss": 0, "ixrss": 0, "majflt": 1, "maxrss": 31160, "minflt": 15953, "msgrcv": 0, "msgsnd": 0, "nivcsw": 20, "nsignals": 0, "nswap": 0, "nvcsw": 502, "oublock": 8, "stime": 0.509435, "utime": 0.327057 }, "total": { "demo1.add": 1 } }
除此之外,Celery的一个关联项目flower 提供了基于WEB的实时监控。
可以直接使用pip安装flower:
启动flower, 默认情况下,flower监听127.0.0.1:5555
, 我们可以通过address
参数来设定监听地址:
1 celery -A demo1 flower --address=0.0.0.0 --port=5555
使用浏览器访问flower的地址, 可以看到flower的Dashboard页面:
flower
不只提供了监控能力,还提供了执行任务的REST API
。我们可以直接通过HTTP请求来提交任务, 比如:
1 2 [root@centos1 celery]# curl -X POST -d '{"args":[1,2]}' 'http://127.0.0.1:5555/api/task/async-apply/demo1.add?refresh=True' {"task-id": "fde316ea-7aed-4a85-8255-06935c5b015a", "state": "PENDING”}
但这些API的稳定性还有些问题,不建议生产环境使用。具体API细节请参考文档 。