python 脚本 | mysql 常用操作工具类
重新优化了一版本,支持多数据源操作
- 使用例子参考,如下:
mysql_enmotech_conf_1 = {
'host': '192.168.101.101',
'port': 3306,
'user': 'enmotech',
'passwd': 'xxxxxx',
'db': 'modb'
}
mysql_enmotech_conf_2 = {
'host': '192.168.101.102',
'port': 3306,
'user': 'enmotech',
'passwd': 'xxxxxx',
'db': 'modb'
}
# 普通查询
_bool,sql_result,utime = MYSQL.query(sql="select * from user where id = %s",args=[12938674],connect=mysql_enmotech_conf_1,result_dict=True)
# 多数据源查询,直接传入连接串就行
_bool,sql_result,utime = MYSQL.query(sql="select * from user where id = %s",args=[12938674],connect=mysql_enmotech_conf_1,result_dict=True)
_bool,sql_result,utime = MYSQL.query(sql="select * from user where id = %s",args=[12938674],connect=mysql_enmotech_conf_2,result_dict=True)
# 事物操作
conn,cursor = MYSQL.begin(connect=mysql_enmotech_conf_1)
t1 = MYSQL.begin_dml(cursor,sql="insert into test(id) values (%s)",args=123)
t2 = MYSQL.begin_dml(cursor,sql="insert into test(id) values (%s)",args=456)
MYSQL.commit(conn,cursor)
- 脚本文件如下:
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
'''
@文件 : mysql_utils.py
@时间 : 2021/12/22 10:19:29
@作者 : Spanner
@版本 : 0.1
@说明 : Python Mysql操作类
'''
import pymysql
from dbutils.pooled_db import PooledDB
import time
import hashlib
# 计算函数执行时间,返回秒数和结果
def func_used_time(func):
def wrapper(*args,**kwargs):
start_time = time.time()
_bool,result = func(*args,**kwargs)
end_time = time.time()
used_seconds = round(end_time-start_time,2)
return _bool,result,used_seconds
return wrapper
# md5
def mk_md5(value):
if value:
m = hashlib.md5()
m.update(str(value).encode('utf8'))
return m.hexdigest()
else:
return None
# Mysql操作类
class MYSQL(object):
_pool_dict = {}
_connect_list = []
@staticmethod
def pool_init(kwargs):
return PooledDB(
creator=pymysql,
maxconnections=1000,
blocking=True,
host = kwargs.get('host'),
port = kwargs.get('port'),
user = kwargs.get('user'),
password = kwargs.get('passwd'),
database = kwargs.get('db'),
charset='utf8'
)
@classmethod
def dbpool(cls,kwargs,kwargs_md5):
if kwargs_md5 not in cls._connect_list:
cls._connect_list.append(kwargs_md5)
cls._pool_dict[f'{kwargs_md5}'] = cls.pool_init(kwargs)
elif not cls._pool_dict.get(f'{kwargs_md5}'):
cls.pool_init(kwargs)
@classmethod
def dbconnect(cls,connect=dict,result_dict=bool):
kwargs_md5 = mk_md5(f"{connect}")
cls.dbpool(connect,kwargs_md5)
conn = cls._pool_dict.get(f'{kwargs_md5}').connection()
if result_dict:
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回dict
else:
cursor = conn.cursor() #返回list
return conn, cursor
@staticmethod
def dbconnect_close(conn, cursor):
if cursor and conn:
cursor.close()
conn.close()
@classmethod
@func_used_time
def query(cls,connect=dict,result_dict=True,sql=None, args=None):
'''
# 查询
- 返回:True-成功,False-失败,result-数据集/异常,time-耗时
'''
try:
conn, cursor = cls.dbconnect(connect,result_dict)
cursor.execute(sql, args)
result = cursor.fetchall()
if cursor.rowcount > 0:
return True,result
else:
return False,None
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn,cursor)
@classmethod
@func_used_time
def insert(cls,connect=dict,result_dict=True,sql=None, args=None):
'''
# 单条写入
- 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时
'''
try:
conn, cursor = cls.dbconnect(connect,result_dict)
rows = cursor.execute(sql, args)
conn.commit()
if rows:
if rows > 0:
return True,rows
return False,rows
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
@classmethod
@func_used_time
def insert_many(cls,connect=dict,result_dict=True,sql=None, args=None):
'''
# 批量写入
- 参数:传入列表或元组
- 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时
'''
try:
conn, cursor = cls.dbconnect(connect,result_dict)
rows = cursor.executemany(sql, args)
conn.commit()
if rows:
if rows > 0:
return True,rows
return False,rows
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
@classmethod
@func_used_time
def update(cls,connect=dict,result_dict=True,sql=None, args=None):
'''
# 更新
- 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时
'''
try:
conn, cursor = cls.dbconnect(connect,result_dict)
rows = cursor.execute(sql, args)
conn.commit()
return True,rows
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
@classmethod
@func_used_time
def delete(cls,connect=dict,result_dict=True,sql=None, args=None):
'''
# 删除
- 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时
'''
try:
conn, cursor = cls.dbconnect(connect,result_dict)
rows = cursor.execute(sql, args)
conn.commit()
return True,rows
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
@classmethod
@func_used_time
def ddl(cls,connect=dict,sql=None):
'''
# 执行ddl语句
- 返回:True-成功,False-失败,ex-异常,time-耗时
'''
try:
conn, cursor = cls.dbconnect(connect)
cursor.execute(sql)
conn.commit()
return True,None
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
# 创建事物
@classmethod
def begin(cls,connect=dict):
'''
# 开启事物操作
- 返回:True-成功,False-失败,ex-异常,time-耗时
'''
conn, cursor = cls.dbconnect(connect)
conn.begin()
return conn,cursor
@classmethod
def begin_dml(cls,cursor=None,sql=None, args=None):
'''
# 执行事物SQL
- 返回:True-成功,False-失败,rows-影响行数/异常,time-耗时
'''
try:
rows = cursor.execute(sql, args)
if rows:
if rows > 0:
return True,rows
return True,None
except Exception as ex:
return False,ex
@classmethod
def commit(cls,conn=None,cursor=None):
'''
# 提交事物
- 返回:True-成功,False-失败,ex-异常,time-耗时
'''
try:
_commit = conn.commit()
return True,_commit
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
@classmethod
def rollback(cls,conn=None,cursor=None):
'''
# 回滚事物
- 返回:True-成功,False-失败,ex-异常,time-耗时
'''
try:
_rollback = conn.rollback()
return True,_rollback
except Exception as ex:
return False,ex
finally:
cls.dbconnect_close(conn, cursor)
最后修改时间:2021-12-29 17:21:56
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。