flask使用celery创建异步定时任务

  • 2025-04-11 18:02:43
  • 开发
  • 77
  • shevechco

由于flask是同步的,所以再将耗时的任务交给celery去处理,这样就不会影响flask服务,今天我们就继续学习下使用celery创建异步的定时任务的方法。

目前项目结构是在apps目录下创建tasks目录,目录下创建4个.py文件,分别是

1.__init__.py #celery配置和实例
2.async_task.py #celery异步任务
3.scheduler_task.py #celery定时任务
4.check_task.py #任务检测

1.__init__.py文件内容如下:

#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
import sys
import os
'''
创建celery实例,更新配置,将flask上下文加入到celery中
执行方法:
    windows:  celery -A apps.tasks.celery worker -l info -P eventlet
    linux:    celery -A apps.tasks.celery worker -l info
'''
# celery_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# sys.path.append(celery_path)

task_module = [
    'apps.tasks.scheduler_task'
]

config = {
    "broker_url" :'redis://127.0.0.1:6379/0',
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
"task_serializer" : 'json', "result_serializer" : 'json', "result_expires": 3600, "beat_schedule": { #定时任务 "collect-log-veryday": { "task": "apps.tasks.scheduler_task.count_visitors", 'schedule': crontab(hour=0, minute=30), #每天0时30分执行 'args': (10, 10) #count_visitors需要的参数 } } } celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module) celery.conf.update(**config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): from apps.application import create_app app = create_app() with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask

在该文件进行celery的实例化,并配置定时任务,同时也加入flask上下文,这样就能在celery的任务中使用flask的上下文

2.async_task.py用于异步任务

因为我目前只创建了定时任务,所以该文件没有任何内容,后续如果有异步任务的需求,我再更新这里,目前只是预留了这个文件用于写异步任务。

3.scheduler_task.py用于定时任务,内容如下:

#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from apps.tasks import celery
'''
celery定时任务
celery -A apps.tasks beat -l info
'''

@celery.task()
def count_visitors(id, task_id):
    print('执行了加法函数 {}'.format(id+task_id))
    return {"code":200 , "num": id+task_id}

这里我写了一个demo,count_visistors函数,参数可以查看__init__.py中设置的定时任务的args,这个两个参数是从那里传过来的。

4.check_task.py用于任务检测,内容如下:

#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from celery.result import AsyncResult
from apps.tasks import celery

'''
celery 任务状态检测
'''

def check_task_status(task_id):
    '''
    任务的执行状态:
        PENDING :等待执行
        STARTED :开始执行
        RETRY   :重新尝试执行
        SUCCESS :执行成功
        FAILURE :执行失败
    '''
    result = AsyncResult(id=task_id, app=celery)
    result_dict = {
        'type': result.status,
        'msg': '',
        'data': None,
        'code': 400
    }
    if result.status == 'PENDING':
        result_dict['msg'] = '任务等待中'
    elif result.status == 'STARTED':
        result_dict['msg'] = '任务开始执行'
    elif result.status == 'RETRY':
        result_dict['msg'] = '任务重新尝试执行'
    elif result.status == 'FAILURE':
        result_dict['msg'] = '任务执行失败了'
    elif result.status == 'SUCCESS':
        result = result.get()
        result_dict['msg'] = '任务执行成功'
        result_dict['data'] = result
        result_dict['code'] = 200
    return result_dict

这个是用于后期视图函数中查询任务状态,那么只需要传入任务ID就可以查看任务执行情况。

接着我们来启动任务进行测试,在我的项目目录下执行

celery -A apps.tasks.celery worker -l info -P eventlet

202504111816514365325851.png

由于是win10系统,如果是linux系统-P eventlet参数可以去掉。

由于是定时任务,我们还需要起一个定时任务的beat,命令如下:

celery -A apps.tasks beat -l info

202504111818522423340442.png

这样就好了,可以等待定时任务执行。执行时会在窗口打印return和任务id,用这个ID我们可以去redis中查询任务执行情况。

202504111820072889071493.png

注意上述结果存在redis的1号数据库中。

redis-cli
select 1
get celery-task-meta-6cceb62f-793f-4cfb-a882-e6f23fb6d0a3

202504111822131816494214.png

可以看到celery窗口返回的任务ID都需要添加celery-task-meta-前缀。那么这样就说明我们的定时异步任务完成了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.sulao.cn/post/1024

相关推荐