Celery 是一个异步任务队列。你可以使用它在你的应用上下文之外执行任务。总的想法就是你的应用程序可能需要执行任何消耗资源的任务都可以交给任务队列,让你的应用程序自由和快速地响应客户端请求。
使用 Celery 运行后台任务并不像在线程中这样做那么简单。但是好处多多,Celery 具有分布式架构,使你的应用易于扩展。一个 Celery 安装有三个核心组件:
Celery 客户端: 用于发布后台作业。当与 Flask 一起工作的时候,客户端与 Flask 应用一起运行。
Celery workers: 这些是运行后台作业的进程。Celery 支持本地和远程的 workers,因此你就可以在 Flask 服务器上启动一个单独的 worker,随后随着你的应用需求的增加而新增更多的 workers。
消息代理: 客户端通过消息队列和 workers 进行通信,Celery 支持多种方式来实现这些队列。最常用的代理就是 RabbitMQ 和 Redis
我这里测试的使用的中间件是redis,这里暂时不在赘述redis的安装
我们首先安装celery模块
pip install celery
为了更方便直观的初步了解如何使用,我这里引用的官网的例子,我只写了两个文件,一个tasks.py,一个main.py
tasks.py代码为:
from celery import Celery def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery
main.py代码为
from flask import Flask from tasks import make_celery flask_app = Flask(__name__) flask_app.config.update( CELERY_BROKER_URL='redis://localhost:6379', CELERY_RESULT_BACKEND='redis://localhost:6379' ) celery = make_celery(flask_app) @celery.task() def add_together(a, b): return a + b if __name__ == "__main__": flask_app.run()
接着我们启动flask
python3 main.py
然后我们还需要启动一个接收任务的celery工人,将日志等级设置为info,这样能够更加详细显示的任务状态
celery -A main.celery worker --loglevel=info
接着我们直接去python3交互环境进行测试
from main import add_together #调用任务可以使用delay()方法 add_together.delay(4,5) #返回 <AsyncResult: 3c9ee26c-5209-4003-a5e0-f6c68293165e> r = add_together.delay(4,5) r.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 否则返回 False. r.wait() # 等待任务完成, 返回任务执行结果,很少使用; r.get(timeout=1) # 获取任务执行结果,可以设置等待时间 r.result # 任务执行结果. r.state # PENDING, START, SUCCESS,任务当前的状态 r.status # PENDING, START, SUCCESS,任务当前的状态 r.successful # 任务成功返回true r.traceback # 如果任务抛出了一个异常,你也可以获取原始的回溯信息
我们可以celery工人接收到了任务,并提示成功,打印了结果和所需时间
然后redis里面插入一条json字符串数据
当异步任务完成后我们还需要释放资源,官方给我们介绍使用get()方法或者forget()方法
那么我们可以通过ready()方法查看任务状态,当然我们也需要获取任务id
task_id = result.id #'9fe3eae6-6e63-4972-85c6-d9cc30f960b6'
具体操作是这样的
from celery.result import AsyncResult from main import app async = AsyncResult(id=task_id, app=app) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
更多的celery请查看http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html