flask使用celery异步定时任务备份数据库

  • 2025-04-15 15:11:51
  • 开发
  • 42
  • shevechco

前几天我们学习了celery创建异步定时任务,没有写具体的业务,只是随便写了个函数,今天直接贴上最近写的一个业务数据库备份的实例,这个定时任务还包含一个nginx日志分析统计的功能,大部分配置还是基于之前的学习记录,可以查看这个笔记:https://sulao.cn/post/1024

还是之前那几个文件,我们__init__.py文件内容如下:

#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
'''
创建celery实例,更新配置,将flask上下文加入到celery中
执行方法:
    windows:  celery -A apps.tasks.celery worker -l info -P eventlet -c 1
    linux:    celery -A apps.tasks.celery worker -l info -c 1
'''

task_module = [
    'apps.tasks.scheduler_task',
    'apps.tasks.async_task'
]

config = {
    "broker_url" :'redis://127.0.0.1:6379/0',
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
    "task_serializer" : 'json',
    "result_serializer" : 'json',
    "result_expires": 3600,
    "beat_schedule": { #定时任务
        "collect-log-everyday": {
            "task": "apps.tasks.scheduler_task.count_visitors",
            "schedule": crontab(hour=0, minute=1) #每天0时1分统计分析nginx访客日志
        },
        "backup-db-everyweek":{
            "task": "apps.tasks.scheduler_task.db_backup",
            "schedule": crontab(day_of_week=0, hour=3, minute=0) #每周日的三点备份数据库
        }
    }
}

celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
celery.conf.update(**config)
class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
        from apps.application import create_app
        app = create_app()
        with app.app_context():
            return self.run(*args, **kwargs)
celery.Task = ContextTask

然后就是定时任务函数使用的是db_bakcup,在scheduler_task.py这个文件内,代码如下:

#!/usr/bin/python3
#coding:utf-8
__author__ = 'yang.su'
from flask import current_app
from apps.tasks import celery
from apps.modules.Data import BVisitor
from collections import Counter
import subprocess
import time
import json
import os
'''
celery定时任务
执行方法:
    celery -A apps.tasks beat -l info
'''

#前一天的访客统计分析,基于nginx日志
@celery.task(bind=True)
def count_visitors(self, yesterday_time_stamp=False):
    start_time = time.time()
    if not yesterday_time_stamp:
        yesterday_time_stamp = int(time.time()) - 86400
    yesterday_time_str = time.strftime("%d/%B/%Y", time.localtime(yesterday_time_stamp))
    yesterday_time = time.strftime("%Y%m%d", time.localtime(yesterday_time_stamp))
    count = BVisitor.select().where(BVisitor.collect_date==int(yesterday_time)).count()
    if count > 0:
        current_app.logger.warning("已采集过 {} 的数据, 跳过后续操作, 等待下一次采集分析...".format(yesterday_time))
        return False
    else:
        nginx_time_format_str = "/".join([t[:3] for t in yesterday_time_str.split("/")])
        exclude_arr = current_app.config["EXCLUDE_SUFFIX"].split("|")
        try:
            with open(current_app.config["NGINX_LOG_PATH"], "r") as f:
                ip_list = []
                while True:
                    line = f.readline()
                    line = line.strip("\n")
                    if not line:
                        break
                    else:
                        if nginx_time_format_str in line:
                            url = line.split(" ")[6]
                            try:
                                if url.rsplit(".")[1] not in exclude_arr:
                                    ip_list.append(line.split(" - ")[0])
                            except Exception as e:
                                ip_list.append(line.split(" - ")[0])
            counter = Counter(ip_list)
            data = dict(
                clicks=len(ip_list), 
                total_visitor=len(set(ip_list)), 
                task_id=self.request.id,
                cost_time=round(time.time()-start_time, 2),
                ip_list=json.dumps(sorted(counter.items(), key=lambda x: (-x[1], x[0]))), 
                collect_date=yesterday_time
            )
            visitor = BVisitor(**data)
            visitor.save()
            current_app.logger.info("{} 数据采集分析完成!".format(yesterday_time)) 
            return data
        except Exception as e:
            current_app.logger.error("采集访客数据失败, {}".format(e))
            return False

#定时备份数据库数据
@celery.task
def db_backup():
    backup_time = time.strftime("%Y%m%d", time.localtime())
    backup_name = "{}{}_{}.sql".format(current_app.config["DB_BACKUP_PATH"], current_app.config["DB_NAME"], backup_time)
    if not os.path.exists(current_app.config["DB_BACKUP_PATH"]):
        current_app.logger.info("数据备份目录不存在, 创建该目录!")
        os.makedirs(current_app.config["DB_BACKUP_PATH"], mode=0o777)
    cmd = "mysqldump -h {} -P {} -u {} -p{} {}>{}".format(
        current_app.config["DB_HOST"],
        current_app.config["DB_PORT"],
        current_app.config["DB_USER"],
        current_app.config["DB_PASS"],
        current_app.config["DB_NAME"],
        backup_name)
    try:
        code = subprocess.call(cmd, shell=True)
        if int(code) == 0:
            current_app.logger.info("数据备份成功, 备份路径为: {}".format(backup_name))
        else:
            current_app.logger.error("备份失败, Return code: {}".format(code))
    except Exception as e:
        current_app.logger.error("备份数据失败, {}".format(e))

然后下面就是效果图

202504151515431063633185.png

然后可以在./backup下面看到备份的数据SQL文件。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.sulao.cn/post/1025

相关推荐