flask使用celery创建异步定时任务
- 2025-04-11 18:02:43
- 开发
- 77
- shevechco
由于flask是同步的,所以再将耗时的任务交给celery去处理,这样就不会影响flask服务,今天我们就继续学习下使用celery创建异步的定时任务的方法。
目前项目结构是在apps目录下创建tasks目录,目录下创建4个.py文件,分别是
1.__init__.py #celery配置和实例
2.async_task.py #celery异步任务
3.scheduler_task.py #celery定时任务
4.check_task.py #任务检测
1.__init__.py文件内容如下:
#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
import sys
import os
'''
创建celery实例,更新配置,将flask上下文加入到celery中
执行方法:
windows: celery -A apps.tasks.celery worker -l info -P eventlet
linux: celery -A apps.tasks.celery worker -l info
'''
# celery_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# sys.path.append(celery_path)
task_module = [
'apps.tasks.scheduler_task'
]
config = {
"broker_url" :'redis://127.0.0.1:6379/0',
"result_backend" : 'redis://127.0.0.1:6379/1',
"timezone" : 'Asia/Shanghai',
"enable_utc" : False,
"task_serializer" : 'json',
"result_serializer" : 'json',
"result_expires": 3600,
"beat_schedule": { #定时任务
"collect-log-veryday": {
"task": "apps.tasks.scheduler_task.count_visitors",
'schedule': crontab(hour=0, minute=30), #每天0时30分执行
'args': (10, 10) #count_visitors需要的参数
}
}
}
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
celery.conf.update(**config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
from apps.application import create_app
app = create_app()
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
在该文件进行celery的实例化,并配置定时任务,同时也加入flask上下文,这样就能在celery的任务中使用flask的上下文
2.async_task.py用于异步任务
因为我目前只创建了定时任务,所以该文件没有任何内容,后续如果有异步任务的需求,我再更新这里,目前只是预留了这个文件用于写异步任务。
3.scheduler_task.py用于定时任务,内容如下:
#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from apps.tasks import celery
'''
celery定时任务
celery -A apps.tasks beat -l info
'''
@celery.task()
def count_visitors(id, task_id):
print('执行了加法函数 {}'.format(id+task_id))
return {"code":200 , "num": id+task_id}
这里我写了一个demo,count_visistors函数,参数可以查看__init__.py中设置的定时任务的args,这个两个参数是从那里传过来的。
4.check_task.py用于任务检测,内容如下:
#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from celery.result import AsyncResult
from apps.tasks import celery
'''
celery 任务状态检测
'''
def check_task_status(task_id):
'''
任务的执行状态:
PENDING :等待执行
STARTED :开始执行
RETRY :重新尝试执行
SUCCESS :执行成功
FAILURE :执行失败
'''
result = AsyncResult(id=task_id, app=celery)
result_dict = {
'type': result.status,
'msg': '',
'data': None,
'code': 400
}
if result.status == 'PENDING':
result_dict['msg'] = '任务等待中'
elif result.status == 'STARTED':
result_dict['msg'] = '任务开始执行'
elif result.status == 'RETRY':
result_dict['msg'] = '任务重新尝试执行'
elif result.status == 'FAILURE':
result_dict['msg'] = '任务执行失败了'
elif result.status == 'SUCCESS':
result = result.get()
result_dict['msg'] = '任务执行成功'
result_dict['data'] = result
result_dict['code'] = 200
return result_dict
这个是用于后期视图函数中查询任务状态,那么只需要传入任务ID就可以查看任务执行情况。
接着我们来启动任务进行测试,在我的项目目录下执行
celery -A apps.tasks.celery worker -l info -P eventlet
由于是win10系统,如果是linux系统-P eventlet参数可以去掉。
由于是定时任务,我们还需要起一个定时任务的beat,命令如下:
celery -A apps.tasks beat -l info
这样就好了,可以等待定时任务执行。执行时会在窗口打印return和任务id,用这个ID我们可以去redis中查询任务执行情况。
注意上述结果存在redis的1号数据库中。
redis-cli
select 1
get celery-task-meta-6cceb62f-793f-4cfb-a882-e6f23fb6d0a3
可以看到celery窗口返回的任务ID都需要添加celery-task-meta-前缀。那么这样就说明我们的定时异步任务完成了。
内容版权声明:除非注明,否则皆为本站原创文章。