
源数据库汇中,PYTHON 的使用不是一个可选项,主要在很多地方,监控,处理一些DEVOPS的事情,或者与业务有关的处理的工作都是需要PYTHON 来进行的。下面就用PTYHON 来完成一个很小的打印MYSQL 系统的内存占用率的小脚本来开始 PYTHON travel。(由于是初级水平有待提高,部分代码的有待进步)
在学习PYTHON 的过程中,(很菜)领会到PYTHON 本身的语法是一回事,你使用的各种包的熟悉又是另一回事。所以下面先得说说程序中使用的mysql 的 python connector.
PYTHON 连接到MYSQL 的包有很多 PYMYSQL , MYSQLAB, 这里没有使用而是使用了官方的 Connector/Python 的方式进行连接
下面相关的代码的初衷主要在分析一段时间INNODB BUFFER 的使用率,查看相关的变动情况,当然这样的监控也有各种图形化的监控软件,但灵活性不高
#!/usr/bin/env python3# coding: utf-8import mysql.connectorfrom mysql.connector import errorcodeimport reimport timeimport datetimeimport sysclass DBFREEMEMORY: def __init__(self, user=None, passwd=None, host=None, db=None): self.user = user self.passwd = passwd self.host = host self.db = db def mysql_connect(self): remotedb = { 'host': self.host , 'user': self.user, 'passwd': self.passwd, 'database': self.db, 'charset': 'utf8', 'connection_timeout': 30, 'use_pure': True } try: connect = mysql.connector.connect(**remotedb) mycursor = connect.cursor(dictionary=True)
sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used) 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0") as event_type, round(sum(current_number_of_bytes_used) 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0" mycursor.execute(sql)memory = mycursor.fetchall()
#收集当前使用的内存数
sql_1 = "show global variables like 'innodb_buffer_pool_size';" mycursor.execute(sql_1) full_memory = mycursor.fetchall() #收集当前整体数据库占有的内存for t in full_memory:
if t['Value'] != None: fmem = float(t['Value']) 1024 / 1024 else: t['Value'] = 1 for i in memory: if i['MB_CURRENTLY_USED'] != None: mem = i['MB_CURRENTLY_USED'] else: i['MB_CURRENTLY_USED'] = 1 result = format(float(mem) float(fmem) * 100, '.2f') print(str(result) + '%' + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) #将当前的内存使用数的百分比进行比较,并和当前时间一起打印 except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) finally: mycursor.close() connect.close()if __name__ == '__main__': info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema') info.mysql_connect()
'host': self.host , 'user': self.user, 'passwd': self.passwd, 'database': self.db, 'charset': 'utf8', 'connection_timeout': 30, 'use_pure': True } try: connect = mysql.connector.connect(**remotedb) mycursor = connect.cursor(dictionary=True) sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used) 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0" mycursor.execute(sql) memory = mycursor.fetchall() sql_1 = "show global variables like 'innodb_buffer_pool_size';" mycursor.execute(sql_1) full_memory = mycursor.fetchall() for t in full_memory: if t['Value'] != None: fmem = float(t['Value']) 1024 / 1024 else: t['Value'] = 1 for i in memory: if i['MB_CURRENTLY_USED'] != None: mem = i['MB_CURRENTLY_USED'] else: i['MB_CURRENTLY_USED'] = 1 result = format(float(mem) float(fmem) * 100, '.2f') print(str(result) + '%' + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err)if __name__ == '__main__': info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema') info.mysql_connect()
下面一个程序是针对自动生成测试数据库表,下面会在数据库层面自动生成test 库 以及 test1表,并插入随机数 150万
#!/usr/bin/env python3# coding: utf-8from __future__ import print_functionimport mysql.connectorfrom mysql.connector import errorcodefrom datetime import date, datetime, timedeltaimport reimport timeimport datetimeimport sysimport randomclass DBFREEMEMORY: def __init__(self, user=None, passwd=None, host=None, db=None): self.user = user self.passwd = passwd self.host = host self.db = db def gen_random_string(self): #产生随机内容的方法 char_list = list('1234567890' + '0123456789') random.shuffle(char_list) return ''.join(char_list) def mysql_connect(self): remotedb = { 'host': self.host, 'user': self.user, 'passwd': self.passwd, 'database': self.db, 'charset': 'utf8', 'connection_timeout': 30, 'use_pure': True } try: connect = mysql.connector.connect(**remotedb) mycursor = connect.cursor(dictionary=True) #判断当前的服务器是否已经存在test数据库 mycursor.execute("show databases") database = [mycursor.fetchall()] # print (tables) database_list = re.findall('(\'.*?\')', str(database)) database_list = [re.sub("'", '', each) for each in database_list] print(database_list) #如果存在test 数据库就直接退出 if 'test' in database_list: print('The database of test has existed,it has deleted it,please run the job again')else:
#创建相关
mycursor.execute("create database test")
print('You have test database') DB_NAME = 'test' mycursor.execute("USE test".format(DB_NAME)) #建表 TABLES = {} TABLES['test'] = ( "CREATE TABLE `test1` (" " `id` int(11) NOT NULL AUTO_INCREMENT," " `content` varchar(200) NULL," " `hash` varchar(200) NULL," " `insert_date` date NULL," " PRIMARY KEY (`id`)" ") ENGINE=InnoDB") table_name = TABLES['test'] # mycursor.execute(table_name) mycursor.execute("show tables") table = [mycursor.fetchall()] # print (tables) table_list = re.findall('(\'.*?\')', str(table)) table_list = [re.sub("'", '', each) for each in table_list] print(table_list) #判断如果没有 if 'test1' in table_list: print('The table of test has existed,please delete it') else: try: #执行并开始插入数据 10000条一提交 mycursor.execute(table_name) #print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) s_time = time.time() i = 0 while i < 150000: content = self.gen_random_string() sql = "INSERT INTO test1 (content, hash,insert_date) VALUES ('%s', '%d',now())" \ % (content, hash(content)) mycursor.execute(sql) i += 1 if i % 10000 == 0: connect.commit() print(i) connect.close() print('You have test table') en_time = time.time() print(en_time-s_time) #print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) except Exception as e: print(e) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) finally: mycursor.close() connect.close()if __name__ == '__main__':
info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')info.mysql_connect()
由于微信中的PYTHON 格式有问题,如需下载学习可以到QQ 群下载文件



最后修改时间:2020-06-09 09:17:18
文章转载自AustinDatabases,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




