flask使用celery异步定时任务备份数据库
- 2025-04-15 15:11:51
- 开发
- 42
- shevechco
前几天我们学习了celery创建异步定时任务,没有写具体的业务,只是随便写了个函数,今天直接贴上最近写的一个业务数据库备份的实例,这个定时任务还包含一个nginx日志分析统计的功能,大部分配置还是基于之前的学习记录,可以查看这个笔记:https://sulao.cn/post/1024
还是之前那几个文件,我们__init__.py文件内容如下:
#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
'''
创建celery实例,更新配置,将flask上下文加入到celery中
执行方法:
windows: celery -A apps.tasks.celery worker -l info -P eventlet -c 1
linux: celery -A apps.tasks.celery worker -l info -c 1
'''
task_module = [
'apps.tasks.scheduler_task',
'apps.tasks.async_task'
]
config = {
"broker_url" :'redis://127.0.0.1:6379/0',
"result_backend" : 'redis://127.0.0.1:6379/1',
"timezone" : 'Asia/Shanghai',
"enable_utc" : False,
"task_serializer" : 'json',
"result_serializer" : 'json',
"result_expires": 3600,
"beat_schedule": { #定时任务
"collect-log-everyday": {
"task": "apps.tasks.scheduler_task.count_visitors",
"schedule": crontab(hour=0, minute=1) #每天0时1分统计分析nginx访客日志
},
"backup-db-everyweek":{
"task": "apps.tasks.scheduler_task.db_backup",
"schedule": crontab(day_of_week=0, hour=3, minute=0) #每周日的三点备份数据库
}
}
}
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
celery.conf.update(**config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
from apps.application import create_app
app = create_app()
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
然后就是定时任务函数使用的是db_bakcup,在scheduler_task.py这个文件内,代码如下:
#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from flask import current_app
from apps.tasks import celery
from apps.modules.Data import BVisitor
from collections import Counter
import subprocess
import time
import json
import os
'''
celery定时任务
执行方法:
celery -A apps.tasks beat -l info
'''
#前一天的访客统计分析,基于nginx日志
@celery.task(bind=True)
def count_visitors(self, yesterday_time_stamp=False):
start_time = time.time()
if not yesterday_time_stamp:
yesterday_time_stamp = int(time.time()) - 86400
yesterday_time_str = time.strftime("%d/%B/%Y", time.localtime(yesterday_time_stamp))
yesterday_time = time.strftime("%Y%m%d", time.localtime(yesterday_time_stamp))
count = BVisitor.select().where(BVisitor.collect_date==int(yesterday_time)).count()
if count > 0:
current_app.logger.warning("已采集过 {} 的数据, 跳过后续操作, 等待下一次采集分析...".format(yesterday_time))
return False
else:
nginx_time_format_str = "/".join([t[:3] for t in yesterday_time_str.split("/")])
exclude_arr = current_app.config["EXCLUDE_SUFFIX"].split("|")
try:
with open(current_app.config["NGINX_LOG_PATH"], "r") as f:
ip_list = []
while True:
line = f.readline()
line = line.strip("\n")
if not line:
break
else:
if nginx_time_format_str in line:
url = line.split(" ")[6]
try:
if url.rsplit(".")[1] not in exclude_arr:
ip_list.append(line.split(" - ")[0])
except Exception as e:
ip_list.append(line.split(" - ")[0])
counter = Counter(ip_list)
data = dict(
clicks=len(ip_list),
total_visitor=len(set(ip_list)),
task_id=self.request.id,
cost_time=round(time.time()-start_time, 2),
ip_list=json.dumps(sorted(counter.items(), key=lambda x: (-x[1], x[0]))),
collect_date=yesterday_time
)
visitor = BVisitor(**data)
visitor.save()
current_app.logger.info("{} 数据采集分析完成!".format(yesterday_time))
return data
except Exception as e:
current_app.logger.error("采集访客数据失败, {}".format(e))
return False
#定时备份数据库数据
@celery.task
def db_backup():
backup_time = time.strftime("%Y%m%d", time.localtime())
backup_name = "{}{}_{}.sql".format(current_app.config["DB_BACKUP_PATH"], current_app.config["DB_NAME"], backup_time)
if not os.path.exists(current_app.config["DB_BACKUP_PATH"]):
current_app.logger.info("数据备份目录不存在, 创建该目录!")
os.makedirs(current_app.config["DB_BACKUP_PATH"], mode=0o777)
cmd = "mysqldump -h {} -P {} -u {} -p{} {}>{}".format(
current_app.config["DB_HOST"],
current_app.config["DB_PORT"],
current_app.config["DB_USER"],
current_app.config["DB_PASS"],
current_app.config["DB_NAME"],
backup_name)
try:
code = subprocess.call(cmd, shell=True)
if int(code) == 0:
current_app.logger.info("数据备份成功, 备份路径为: {}".format(backup_name))
else:
current_app.logger.error("备份失败, Return code: {}".format(code))
except Exception as e:
current_app.logger.error("备份数据失败, {}".format(e))
然后下面就是效果图
然后可以在./backup下面看到备份的数据SQL文件。
内容版权声明:除非注明,否则皆为本站原创文章。