欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

Python 封装DBUtils 和pymysql实例

程序员文章站 2022-04-27 20:44:29
...
  之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。

  主要功能:

    1.查询单个对象:

      所需参数:表名,过滤条件

    2.查询多个对象:
      所需参数:表名,过滤条件

    3.按主键查询:
      所需参数:表名,值

    4.分页查询:
      所需参数:表名,页码,每页记录数,过滤条件

  具体代码如下:  

  1 import json, os, sys, time  2   3 import pymysql  4 from DBUtils import PooledDB  5   6 class BaseDao(object):  7     """  8     简便的数据库操作基类  9     """ 10     __config = {}                   # 数据库连接配置 11     __conn = None                   # 数据库连接 12     __cursor = None                 # 数据库游标 13     __database = None               # 用于临时村塾查询数据库 14     __tableName = None              # 用于临时存储查询表名 15     __fields = []                   # 用于临时存储查询表的字段列表 16     __primaryKey_dict = {}          # 用于存储配置中的数据库中所有表的主键 17  18     def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 19         if host is None: 20             raise Exception("Parameter [host] is None.") 21         if user is None: 22             raise Exception("Parameter [user] is None.") 23         if password is None: 24             raise Exception("Parameter [password] is None.") 25         if database is None: 26             raise Exception("Parameter [database] is None.") 27         if port is None: 28             raise Exception("Parameter [port] is None.") 29         self.__config = dict({ 30             "creator" : creator, "charset":charset, 31             "host":host, "port":port, 
 32             "user":user, "password":password, "database":database 33         }) 34         self.__conn = PooledDB.connect(**self.__config) 35         self.__cursor = self.__conn.cursor() 36         self.__database = self.__config["database"] 37         self.__init_primaryKey() 38         print(get_time(), "数据库连接初始化成功。") 39          40     def __del__(self): 41         '重写类被清除时调用的方法' 42         if self.__cursor: 43             self.__cursor.close() 44             print(get_time(), "游标关闭") 45         if self.__conn: 46             self.__conn.close() 47             print(get_time(), "连接关闭") 48  49     def select_one(self, tableName=None, filters={}): 50         ''' 51         查询单个对象 52         @tableName 表名 53         @filters 过滤条件 54         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 55         ''' 56         self.__check_params(tableName) 57         sql = self.__query_util(filters) 58         self.__cursor.execute(sql) 59         result = self.__cursor.fetchone() 60         return self.__parse_result(result) 61  62     def select_pk(self, tableName=None, primaryKey=None): 63         ''' 64         按主键查询 65         @tableName 表名 66         @primaryKey 主键值 67         ''' 68         self.__check_params(tableName) 69         filters = {} 70         filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 71         sql = self.__query_util(filters) 72         self.__cursor.execute(sql) 73         result = self.__cursor.fetchone() 74         return self.__parse_result(result) 75          76     def select_all(self, tableName=None, filters={}): 77         ''' 78         查询所有 79         @tableName 表名 80         @filters 过滤条件 81         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 82         ''' 83         self.__check_params(tableName) 84         sql = self.__query_util(filters) 85         self.__cursor.execute(sql) 86         results = self.__cursor.fetchall() 87         return self.__parse_results(results) 88  89     def count(self, tableName=None): 90         ''' 91         统计记录数 92         ''' 93         self.__check_params(tableName) 94         sql = "SELECT count(*) FROM %s"%(self.__tableName) 95         self.__cursor.execute(sql) 96         result = self.__cursor.fetchone() 97         return result[0] 98  99     def select_page(self, tableName=None, pageNum=1, limit=10, filters={}):100         '''101         分页查询102         @tableName 表名103         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value104         '''105         self.__check_params(tableName)106         totalCount = self.count()107         if totalCount / limit == 0 :108             totalPage = totalCount / limit109         else:110             totalPage = totalCount // limit + 1111         if pageNum > totalPage:112             print("最大页数为%d"%totalPage)113             pageNum = totalPage114         elif pageNum < 1:115             print("页数不能小于1")116             pageNum = 1117         beginindex = (pageNum-1) * limit118         filters.setdefault("_limit_", (beginindex, limit))119         sql = self.__query_util(filters)120         self.__cursor.execute(sql)121         results = self.__cursor.fetchall()122         return self.__parse_results(results)123 124     def __parse_result(self, result):125         '用于解析单个查询结果,返回字典对象'126         obj = {}127         for k,v in zip(self.__fields, result):128             obj[k] = v129         return obj130 131     def __parse_results(self, results):132         '用于解析多个查询结果,返回字典列表对象'133         objs = []134         for result in results:135             obj = self.__parse_result(result)136             objs.append(obj)137         return objs138 139     def __init_primaryKey(self):140         '根据配置中的数据库读取该数据库中所有表的主键集合'141         sql = """SELECT TABLE_NAME, COLUMN_NAME142                 FROM  Information_schema.columns143                 WHERE COLUMN_KEY='PRI' AND TABLE_SCHEMA='%s'"""%(self.__database)144         self.__cursor.execute(sql)145         results = self.__cursor.fetchall()146         for result in results:147             self.__primaryKey_dict[result[0]] = result[1]148 149     def __query_fields(self, tableName=None, database=None):150         '查询表的字段列表, 将查询出来的字段列表存入 __fields 中'151         sql = """SELECT column_name152                 FROM  Information_schema.columns153                 WHERE table_Name = '%s' AND TABLE_SCHEMA='%s'"""%(tableName, database)154         self.__cursor.execute(sql)155         fields_tuple = self.__cursor.fetchall()156         self.__fields = [fields[0] for fields in fields_tuple]157 158     def __query_util(self, filters=None):159         """160         SQL 语句拼接方法161         @filters 过滤条件162         """163         sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}'164         # 拼接查询表165         sql = sql.replace("#{TABLE_NAME}", self.__tableName)166         # 拼接查询字段167         self.__query_fields(self.__tableName, self.__database)168         FIELDS = ""169         for field in self.__fields:170             FIELDS += field + ", "171         FIELDS = FIELDS[0: len(FIELDS)-2]172         sql = sql.replace("#{FIELDS}", FIELDS)173         # 拼接查询条件(待优化)174         if filters is None:175             sql = sql.replace("#{FILTERS}", "")176         else:177             FILTERS =  ""178             if not isinstance(filters, dict):179                 raise Exception("Parameter [filters] must be dict type. ")180             isPage = False181             if filters.get("_limit_"):182                 isPage = True183                 beginindex, limit = filters.get("_limit_")184             for k, v in filters.items():185                 if k.startswith("_in_"):                # 拼接 in186                     FILTERS += "AND %s IN (" %(k[4:])187                     values = v.split(",")188                     for value in values:189                         FILTERS += "%s,"%value190                     FILTERS = FILTERS[0:len(FILTERS)-1] + ") "191                 elif k.startswith("_nein_"):            # 拼接 not in192                     FILTERS += "AND %s NOT IN (" %(k[4:])193                     values = v.split(",")194                     for value in values:195                         FILTERS += "%s,"%value196                     FILTERS = FILTERS[0:len(FILTERS)-1] + ") "197                 elif k.startswith("_like_"):            # 拼接 like198                     FILTERS += "AND %s like '%%%s%%' " %(k[6:], v)199                 elif k.startswith("_ne_"):              # 拼接不等于200                     FILTERS += "AND %s != '%s' " %(k[4:], v)201                 elif k.startswith("_lt_"):              # 拼接小于202                     FILTERS += "AND %s < '%s' " %(k[4:], v)203                 elif k.startswith("_le_"):              # 拼接小于等于204                     FILTERS += "AND %s <= '%s' " %(k[4:], v)205                 elif k.startswith("_gt_"):              # 拼接大于206                     FILTERS += "AND %s > '%s' " %(k[4:], v)207                 elif k.startswith("_ge_"):              # 拼接大于等于208                     FILTERS += "AND %s >= '%s' " %(k[4:], v)209                 elif k in self.__fields:                # 拼接等于210                     FILTERS += "AND %s = '%s' "%(k, v)211             sql = sql.replace("#{FILTERS}", FILTERS)212             if isPage:213                 sql += "LIMIT %d,%d"%(beginindex, limit)214 215         print(get_time(), sql)216         return sql217 218     def __check_params(self, tableName):219         '''220         检查参数221         '''222         if tableName:223             self.__tableName = tableName224         else:225             if self.__tableName is None:226                 raise Exception("Parameter [tableName] is None.")227 228 def get_time():229     return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())230 231 if __name__ == "__main__":232     config = {233         # "creator": pymysql,234         # "host" : "127.0.0.1", 235         "user" : "root", 
236         "password" : "root",237         "database" : "test", 
238         # "port" : 3306,239         # "charset" : 'utf8'240     }241     base = BaseDao(**config)242     ########################################################################243     # user = base.select_one("user")244     # print(user)245     ########################################################################246     # users = base.select_all("user")247     # print(users)248     ########################################################################249     # filter1 = {250     #     "sex":0,251     #     "_in_id":"1,2,3,4,5",252     #     "_like_name":"zhang",253     #     "_ne_name":"wangwu"254     # }255     # user_filters = base.select_all(tableName="user", filters=filter1)256     # print(user_filters)257     ########################################################################258     # menu = base.select_one(tableName="menu")259     # print(menu)260     ########################################################################261     # user_pk = base.select_pk("user", 2)262     # print(user_pk)263     ########################################################################264     # filter2 = {265     #     "_in_id":"1,2,3,4",266     #     "_like_name":"test"267     # }268     # user_limit = base.select_page("user", 2, 10, filter2)  #未实现269     # print(user_limit)270     ########################################################################
View Code

  代码中已经给出了几个具体示例,大家可以参考使用。  

以上就是Python 封装DBUtils 和pymysql实例的详细内容,更多请关注其它相关文章!