Python下PyMySQL操作类

项目中用到,网上找的都不太完善,这里记录下,按自己需要稍微做了修改

类文件mysql.py

#!/usr/local/bin/python3
#coding:utf-8
__author__ = 'merci'

from flask import Flask
import pymysql
app = Flask(__name__)
app.config.from_pyfile('../config.py')
HOST = app.config.get('HOST')
PORT = app.config.get('PORT')
USERNAME = app.config.get('USERNAME')
PASSWORD = app.config.get('PASSWORD')
DBNAME = app.config.get('DBNAME')
CHARSET = app.config.get('CHARSET')

class database:

    #初始化
    def __init__(self):
        self._host = HOST
        self._port = PORT
        self._username = USERNAME
        self._password = PASSWORD
        self._dbname = DBNAME
        self._charset = CHARSET
        self._conn = self.connectMysql()
        self._cur = self._conn.cursor()

    #连接数据库
    def connectMysql(self):
        connection = False
        try:
            connection = pymysql.connect(
                host = self._host,
                port = self._port,
                user = self._username,
                passwd = self._password,
                db = self._dbname,
                charset = self._charset
            )
        except Exception as e:
            print("Connect mysql failed , %s" % e)
            connection = False
        return connection

    #查询所有
    def fetch_all(self, sql):
        res = ''
        if self._conn:
            try:
                self._cur.execute(sql)
                res = self._cur.fetchall()
                self.close()
            except Exception as e:
                res = False
                print("Select error , %s" % e)
        return res

    #单个查询
    def fetch_one(self, sql):
        res = ''
        if self._conn:
            try:
                self._cur.execute(sql)
                res = self._cur.fetchone()
                self.close()
            except Exception as e:
                res = False
                print("Select error , %s" % e)
        return res

    #更新操作
    def update(self, sql):
        flag = False
        if self._conn:
            try:
                self._cur.execute(sql)
                self._conn.commit()
                flag = True
                self.close()
            except Exception as e:
                flag = False
                print("Update error , %s" % e)
        return flag

    #关闭数据库
    def close(self):
        if self._conn:
            try:
                if type(self._cur) == 'object':
                    self._cur.close()
                if type(self._conn) == 'object':
                    self._conn.close()
            except Exception as e:
                print("Cursor and Databse connection not close , %s %s %s" % (e, type(self._cur), type(self._conn)))

然后在我的手机版直接全复制上来吧

#!/usr/local/bin/python3
#coding:utf-8
__author__ = 'merci'
'''手机版'''

from flask import Flask,Blueprint,request,jsonify,render_template
from urllib import request as rq
from aip import AipNlp
from common import formatTime,getThumb,getPage
from model.mysql import database
import pymysql
import math
import json
app = Flask(__name__)
#定义手机版蓝图
wap = Blueprint('wap', __name__)
db = database()

#首页
@wap.route('/')
@wap.route('/index')
def index():
    #SEO
    site = seo()
    #导航
    nav = navigation()
    #热门关键词
    hot = hotkeywords()
    #最新
    sql = """SELECT * FROM `v9_picture` WHERE `status`=99 ORDER BY `id` DESC LIMIT 10"""
    new = db.fetch_all(sql)
    return render_template('index.html', site=site, nav=nav, new=new, hot=hot)

#首页ajax下拉加载
@wap.route('/ajaxpic', methods=['POST','GET'])
def ajaxPic():
    page = request.form.get('page')
    #page页数
    if not page:
        page = 1
    else:
        page = int(page)
    #第一次显示数量
    pagebase = 10
    #每次下拉增加数量
    pagenum = 6
    #limit分页数
    limit = pagebase + (page * pagenum)
    sql = """SELECT COUNT(*) FROM `v9_picture` WHERE `status`=99"""
    total = db.fetch_one(sql)
    #总页数
    pagetotal = math.ceil(total[0]/pagenum)
    content = {}
    #防止下拉多次崩溃
    if pagetotal > 1 or page <= 10:
        sql2 = """SELECT * FROM `v9_picture` WHERE `status`=99 ORDER BY `id` DESC LIMIT %d,%d""" % (limit, pagenum)
        result = db.fetch_all(sql2)
        datas = []
        for val in result:
            datas.append({'id':val[0], 'title':val[3], 'thumb':getThumb(val[5]), 'time':formatTime(val[15])})
        content['code'] = 1
        content['msg'] = 'OK'
        #最多只让拉10次
        if pagetotal > 10:
            content['pagetotal'] = 10
        else:
            content['pagetotal'] = pagetotal
        content['list'] = datas
    else:
        content = {'code': 0}   
    return jsonify(content)

#列表页
@wap.route('/catid/<int:catid>', methods=['POST','GET'])
def category(catid):
    catid = int(catid)
    page = request.args.get('page')
    #SEO
    site = seo()
    #导航
    nav = navigation()
    #热门关键词
    hot = hotkeywords()
    #每页数量
    pagenum = 10
    show_status = 0
    if not page:
        page = 1
    else:
        page = int(page)
        if page > 1:
            show_status = 1

    limit_start = (page-1)*10
    sql = """SELECT * FROM `v9_picture` WHERE `catid`=%d AND `status`=99 ORDER BY `id` DESC LIMIT %d,%d""" % (catid, limit_start, pagenum)
    result = db.fetch_all(sql)
    data_list = []
    for val in result:
        data_list.append({'id':val[0], 'title':val[3], 'thumb':getThumb(val[5]), 'time':formatTime(val[15])})
    #查询总数
    sql2 = """SELECT COUNT(*) FROM `v9_picture` WHERE `catid`=%d AND `status`=99""" % catid
    total = db.fetch_one(sql2)
    page_total = int(math.ceil(total[0]/pagenum))
    page_list = getPage(page_total, page)
    datas = {
        'catid' : catid,
        'data_list' : data_list,
        'page' : page,
        'page_total' : page_total,
        'show_status' : show_status,
        'page_list' : page_list
    }
    return render_template('category.html', site=site, nav=nav, datas=datas, hot=hot)

#内容页
@wap.route('/detail/<int:id>')
def detail(id):
    id = int(id)
    #SEO
    site = seo()
    #导航
    nav = navigation()
    #热门关键词
    hot = hotkeywords()
    sql = """SELECT * FROM `v9_picture` WHERE `id`=%d AND `status`=99""" % id
    result = db.fetch_one(sql)
    #所属分类
    sql2 = """SELECT * FROM `v9_category` WHERE `catid`=%d""" % result[1]
    result2 = db.fetch_one(sql2)
    cat = {'catid':result2[0], 'catname':result2[9]}

    sql3 = """SELECT * FROM `v9_picture_data` WHERE `id`=%d""" % id
    result3 = db.fetch_one(sql3)
    picturelist = json.loads(result3[9])
    #本类推荐
    sql4 = """SELECT * FROM `v9_picture` WHERE `posids`=1 AND `catid`=%d ORDER BY `id` DESC LIMIT 10""" % cat['catid']
    result4 = db.fetch_all(sql4)

    datas = {
        'id' : result[0],
        'title' : result[3],
        'digg' : result[17],
        'thumb' : picturelist["0"],
        'total' : len(picturelist)
    }
    return render_template('detail.html', site=site, nav=nav, cat=cat, hot=hot, datas=datas, rec=result4)

@wap.route('/detailimg', methods=['POST','GET'])
def detailImg():
    id = request.args.get('id')
    sql = """SELECT * FROM `v9_picture_data` WHERE `id`=%d""" % int(id)
    result = db.fetch_one(sql)
    return jsonify(json.loads(result[9]))

#搜索页
@wap.route('/search', methods=['POST','GET'])
def search():
    keywords = request.args.get('q')
    page = request.args.get('page')
    #SEO
    site = seo()
    #导航
    nav = navigation()
    #热门关键词
    hot = hotkeywords()
    #每页数量
    pagenum = 10
    show_status = 0
    if not page:
        page = 1
    else:
        page = int(page)
        if page > 1:
            show_status = 1

    limit_start = (page-1)*10
    sql = """SELECT * FROM v9_picture WHERE `title` LIKE '%%%s%%' LIMIT %d,%d""" % (keywords, limit_start, pagenum)
    result = db.fetch_all(sql)  
    sql2 = """SELECT COUNT(*) FROM v9_picture WHERE `title` LIKE '%%%s%%'""" % keywords
    total = db.fetch_one(sql2)
    page_total = int(math.ceil(total[0]/pagenum))
    page_list = getPage(page_total, page)
    data_list = []
    for val in result:
        data_list.append({'id':val[0], 'title':val[3], 'thumb':getThumb(val[5]), 'time':formatTime(val[15])})
    datas = {
        'q' : keywords,
        'data_list' : data_list,
        'page' : page,
        'page_total' : page_total,
        'show_status' : show_status,
        'page_list' : page_list
    }
    return render_template('search.html', site=site, nav=nav, datas=datas, hot=hot)

#SEO
def seo():
    sql = """SELECT * FROM `v9_site`"""
    site = db.fetch_one(sql)
    return site

#导航
def navigation():
    sql = """SELECT * FROM `v9_category` WHERE 1 ORDER BY `listorder` ASC"""
    nav = db.fetch_all(sql)
    navlist = []
    for val in nav:
        navlist.append({'catid':val[0], 'catname':val[9]})
    return navlist

#热门关键词
def hotkeywords():
    sql = """SELECT * FROM `v9_keyword` WHERE 1 ORDER BY `videonum` DESC LIMIT 50"""
    hot = db.fetch_all(sql)
    return hot

#浏览数
@wap.route('/hits/<int:id>', methods=['POST','GET'])
def hits(id):
    sql = """SELECT `views` FROM `v9_hits` WHERE `hitsid`=CONCAT('c-3-',%d)""" % int(id)
    hits = db.fetch_one(sql)
    sql2 = """UPDATE `v9_hits` SET `views`=views+1 WHERE `hitsid`=CONCAT('c-3-',%d)""" % int(id)
    db.update(sql2)
    hit = """
    layui.use('jquery',function(){
        var $ = layui.jquery;
        $('#hits').text(%s);
      });
    """ % hits[0]
    return hit

@wap.route('/enjoy', methods=['POST','GET'])
def enjoy():
    id = request.args.get('id')
    sql = """UPDATE v9_picture SET `digg`=digg+1 WHERE `id`=%d""" % int(id)
    status = db.update(sql)
    if status:
        return '谢谢你的参与!'
    else:
        return '数据错误,请稍后重试!'


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://sulao.cn/post/571.html

我要评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。