Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
是基于Python实现的模块, 用于执行异步定时周期任务的
其结构的组成是由:
- 用户任务 app
- 管道 broker 用于存储任务 官方推荐 redis rabbitMQ / backend 用于存储任务执行结果的
- 员工 worker
Celery的简单实例
part_one.py
:
1 | import time |
part_two.py
:
1 | from part_one import func |
part_three
:
1 | from celery.result import AsyncResult |
三个文件创建完成了,现在开始分析哪个文件是app,哪个文件是borker,哪个是worker;最终我们要执行的任务是在 part_one.py 中,也就是worker需要执行的任务,所以worker就是 part_one.py了
如何启动worker
Linux - celery worker -A s1 -l INFO
Windows:这里需要注意的是celery 4.0 已经不再对Windows操作系统提供支持了,也就是在windows环境下出现问题除非自己解决,否则官方是不会给你解决的
Windows - celery worker -A s1 -l INFO -P eventletps:
eventlet 是一个python的三方库 需要使用 pip安装 pip install eventlet
启动完成,其实在part_one.py当中,worker已经知道了自己的broker 和 backend 在哪里了
接下来就让异步任务开始执行吧,对了 part_two.py 中就是使用 delay 的方式来开始执行的异步任务
执行 part_two .py 得到了一个字符串 55a84ea3-afa4-4ab9-8650-40e156c07441 这个字符串儿就是异步任务的ID
然后通过part_three.py修改异步任务的ID来获取任务返回的结果
Celery项目目录
目录Celery_task这个名字可以随意起,但是一定要注意在这个目录下一定要有一个celery.py这个文件
celery.py
:
1 | from celery import Celery |
task_one.py
:
1 | from .celery import celery_task |
task_two.py
:
1 | import time |
这样Celery项目目录结构就已经做好了然后再 my_celery中调用
1 | from Celery_task.task_one import one |
PS:
启动Worker的时候无需再使用文件启动,直接启动你的Celery_task目录就行了
celery worker -A Celery_task -l INFO -P eventlet
这样celery就可以自动的去检索当前目录下所有的task了,通过Include这个参数逐一去寻找
Celery定时任务
我们还使用Celery_task这个示例来修改一下
my_celery中进行一下小修改
1 | from Celery_task.task_one import one |
Celery周期任务
首先要对Celery_task中的celery.py进行一点修改:
1 | from celery import Celery |
创建Worker的方式并没有发行变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给Worker去执行,这里需要一个生产者beat
celery beat -A Celery_task # 创建生产者 beat 你的 schedule 写在哪里,就要从哪里启动
celery worker -A Celery_task -l INFO -P eventlet
创建worker之后,每10秒就会由beat创建一个任务给Worker去执行