暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

mysql python 操作工具类

原创 袁长刚 2021-11-02
2134

python 脚本 | mysql 常用操作工具类

重新优化了一版本,支持多数据源操作

  • 使用例子参考,如下:
mysql_enmotech_conf_1 = { 'host': '192.168.101.101', 'port': 3306, 'user': 'enmotech', 'passwd': 'xxxxxx', 'db': 'modb' } mysql_enmotech_conf_2 = { 'host': '192.168.101.102', 'port': 3306, 'user': 'enmotech', 'passwd': 'xxxxxx', 'db': 'modb' } # 普通查询 _bool,sql_result,utime = MYSQL.query(sql="select * from user where id = %s",args=[12938674],connect=mysql_enmotech_conf_1,result_dict=True) # 多数据源查询,直接传入连接串就行 _bool,sql_result,utime = MYSQL.query(sql="select * from user where id = %s",args=[12938674],connect=mysql_enmotech_conf_1,result_dict=True) _bool,sql_result,utime = MYSQL.query(sql="select * from user where id = %s",args=[12938674],connect=mysql_enmotech_conf_2,result_dict=True) # 事物操作 conn,cursor = MYSQL.begin(connect=mysql_enmotech_conf_1) t1 = MYSQL.begin_dml(cursor,sql="insert into test(id) values (%s)",args=123) t2 = MYSQL.begin_dml(cursor,sql="insert into test(id) values (%s)",args=456) MYSQL.commit(conn,cursor)
  • 脚本文件如下:
#!/usr/bin/env python3 # -*- encoding: utf-8 -*- ''' @文件 : mysql_utils.py @时间 : 2021/12/22 10:19:29 @作者 : Spanner @版本 : 0.1 @说明 : Python Mysql操作类 ''' import pymysql from dbutils.pooled_db import PooledDB import time import hashlib # 计算函数执行时间,返回秒数和结果 def func_used_time(func): def wrapper(*args,**kwargs): start_time = time.time() _bool,result = func(*args,**kwargs) end_time = time.time() used_seconds = round(end_time-start_time,2) return _bool,result,used_seconds return wrapper # md5 def mk_md5(value): if value: m = hashlib.md5() m.update(str(value).encode('utf8')) return m.hexdigest() else: return None # Mysql操作类 class MYSQL(object): _pool_dict = {} _connect_list = [] @staticmethod def pool_init(kwargs): return PooledDB( creator=pymysql, maxconnections=1000, blocking=True, host = kwargs.get('host'), port = kwargs.get('port'), user = kwargs.get('user'), password = kwargs.get('passwd'), database = kwargs.get('db'), charset='utf8' ) @classmethod def dbpool(cls,kwargs,kwargs_md5): if kwargs_md5 not in cls._connect_list: cls._connect_list.append(kwargs_md5) cls._pool_dict[f'{kwargs_md5}'] = cls.pool_init(kwargs) elif not cls._pool_dict.get(f'{kwargs_md5}'): cls.pool_init(kwargs) @classmethod def dbconnect(cls,connect=dict,result_dict=bool): kwargs_md5 = mk_md5(f"{connect}") cls.dbpool(connect,kwargs_md5) conn = cls._pool_dict.get(f'{kwargs_md5}').connection() if result_dict: cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回dict else: cursor = conn.cursor() #返回list return conn, cursor @staticmethod def dbconnect_close(conn, cursor): if cursor and conn: cursor.close() conn.close() @classmethod @func_used_time def query(cls,connect=dict,result_dict=True,sql=None, args=None): ''' # 查询 - 返回:True-成功,False-失败,result-数据集/异常,time-耗时 ''' try: conn, cursor = cls.dbconnect(connect,result_dict) cursor.execute(sql, args) result = cursor.fetchall() if cursor.rowcount > 0: return True,result else: return False,None except Exception as ex: return False,ex finally: cls.dbconnect_close(conn,cursor) @classmethod @func_used_time def insert(cls,connect=dict,result_dict=True,sql=None, args=None): ''' # 单条写入 - 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时 ''' try: conn, cursor = cls.dbconnect(connect,result_dict) rows = cursor.execute(sql, args) conn.commit() if rows: if rows > 0: return True,rows return False,rows except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor) @classmethod @func_used_time def insert_many(cls,connect=dict,result_dict=True,sql=None, args=None): ''' # 批量写入 - 参数:传入列表或元组 - 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时 ''' try: conn, cursor = cls.dbconnect(connect,result_dict) rows = cursor.executemany(sql, args) conn.commit() if rows: if rows > 0: return True,rows return False,rows except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor) @classmethod @func_used_time def update(cls,connect=dict,result_dict=True,sql=None, args=None): ''' # 更新 - 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时 ''' try: conn, cursor = cls.dbconnect(connect,result_dict) rows = cursor.execute(sql, args) conn.commit() return True,rows except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor) @classmethod @func_used_time def delete(cls,connect=dict,result_dict=True,sql=None, args=None): ''' # 删除 - 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时 ''' try: conn, cursor = cls.dbconnect(connect,result_dict) rows = cursor.execute(sql, args) conn.commit() return True,rows except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor) @classmethod @func_used_time def ddl(cls,connect=dict,sql=None): ''' # 执行ddl语句 - 返回:True-成功,False-失败,ex-异常,time-耗时 ''' try: conn, cursor = cls.dbconnect(connect) cursor.execute(sql) conn.commit() return True,None except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor) # 创建事物 @classmethod def begin(cls,connect=dict): ''' # 开启事物操作 - 返回:True-成功,False-失败,ex-异常,time-耗时 ''' conn, cursor = cls.dbconnect(connect) conn.begin() return conn,cursor @classmethod def begin_dml(cls,cursor=None,sql=None, args=None): ''' # 执行事物SQL - 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时 ''' try: rows = cursor.execute(sql, args) if rows: if rows > 0: return True,rows return True,None except Exception as ex: return False,ex @classmethod def commit(cls,conn=None,cursor=None): ''' # 提交事物 - 返回:True-成功,False-失败,ex-异常,time-耗时 ''' try: _commit = conn.commit() return True,_commit except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor) @classmethod def rollback(cls,conn=None,cursor=None): ''' # 回滚事物 - 返回:True-成功,False-失败,ex-异常,time-耗时 ''' try: _rollback = conn.rollback() return True,_rollback except Exception as ex: return False,ex finally: cls.dbconnect_close(conn, cursor)
最后修改时间:2021-12-29 17:21:56
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论