暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

--如何用PYTHON 定时打印 MYSQL FREE 使用率,与自动创建测试数据库表

AustinDatabases 2020-06-09
956

源数据库汇中,PYTHON 的使用不是一个可选项,主要在很多地方,监控,处理一些DEVOPS的事情,或者与业务有关的处理的工作都是需要PYTHON 来进行的。下面就用PTYHON 来完成一个很小的打印MYSQL 系统的内存占用率的小脚本来开始 PYTHON travel。(由于是初级水平有待提高,部分代码的有待进步)

在学习PYTHON 的过程中,(很菜)领会到PYTHON 本身的语法是一回事,你使用的各种包的熟悉又是另一回事。所以下面先得说说程序中使用的mysql 的 python connector.

PYTHON 连接到MYSQL 的包有很多 PYMYSQL , MYSQLAB, 这里没有使用而是使用了官方的  Connector/Python 的方式进行连接

下面相关的代码的初衷主要在分析一段时间INNODB BUFFER 的使用率,查看相关的变动情况,当然这样的监控也有各种图形化的监控软件,但灵活性不高

#!/usr/bin/env python3# coding: utf-8import mysql.connectorfrom mysql.connector import errorcodeimport reimport timeimport datetimeimport sys

class DBFREEMEMORY: def __init__(self, user=None, passwd=None, host=None, db=None): self.user = user self.passwd = passwd self.host = host self.db = db def mysql_connect(self): remotedb = { 'host': self.host , 'user': self.user, 'passwd': self.passwd, 'database': self.db, 'charset': 'utf8', 'connection_timeout': 30, 'use_pure': True } try: connect = mysql.connector.connect(**remotedb) mycursor = connect.cursor(dictionary=True)
          sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used)  1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
          ) as event_type, round(sum(current_number_of_bytes_used)  1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"          mycursor.execute(sql)

memory = mycursor.fetchall()

#收集当前使用的内存数

sql_1 = "show global variables like 'innodb_buffer_pool_size';" mycursor.execute(sql_1) full_memory = mycursor.fetchall() #收集当前整体数据库占有的内存

for t in full_memory:

if t['Value'] != None: fmem = float(t['Value']) 1024 / 1024 else: t['Value'] = 1 for i in memory: if i['MB_CURRENTLY_USED'] != None: mem = i['MB_CURRENTLY_USED'] else: i['MB_CURRENTLY_USED'] = 1 result = format(float(mem) float(fmem) * 100, '.2f') print(str(result) + '%' + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) #将当前的内存使用数的百分比进行比较,并和当前时间一起打印 except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) finally: mycursor.close() connect.close()if __name__ == '__main__': info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema') info.mysql_connect()
          'host': self.host ,          'user': self.user,          'passwd': self.passwd,          'database': self.db,          'charset': 'utf8',          'connection_timeout': 30,          'use_pure': True        }        try:          connect = mysql.connector.connect(**remotedb)          mycursor = connect.cursor(dictionary=True)          sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used)  1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"          mycursor.execute(sql)          memory = mycursor.fetchall()          sql_1 = "show global variables  like 'innodb_buffer_pool_size';"          mycursor.execute(sql_1)          full_memory = mycursor.fetchall()          for t in full_memory:            if t['Value'] != None:              fmem = float(t['Value'])  1024 / 1024            else:              t['Value'] = 1          for i in memory:            if i['MB_CURRENTLY_USED'] != None:               mem = i['MB_CURRENTLY_USED']            else:              i['MB_CURRENTLY_USED'] = 1          result = format(float(mem)  float(fmem) * 100, '.2f')          print(str(result) + '%' + '  ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))        except mysql.connector.Error as err:          if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:            print("Something is wrong with your user name or password")          elif err.errno == errorcode.ER_BAD_DB_ERROR:            print("Database does not exist")          else:            print(err)if __name__ == '__main__':   info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')   info.mysql_connect()

下面一个程序是针对自动生成测试数据库表,下面会在数据库层面自动生成test 库 以及 test1表,并插入随机数 150万

#!/usr/bin/env python3# coding: utf-8from __future__ import print_functionimport mysql.connectorfrom mysql.connector import errorcodefrom datetime import date, datetime, timedeltaimport reimport timeimport datetimeimport sysimport randomclass DBFREEMEMORY:    def __init__(self, user=None, passwd=None, host=None, db=None):        self.user = user        self.passwd = passwd        self.host = host        self.db = db    def gen_random_string(self):  #产生随机内容的方法        char_list = list('1234567890' + '0123456789')        random.shuffle(char_list)        return ''.join(char_list)    def mysql_connect(self):        remotedb = {            'host': self.host,            'user': self.user,            'passwd': self.passwd,            'database': self.db,            'charset': 'utf8',            'connection_timeout': 30,            'use_pure': True        }        try:            connect = mysql.connector.connect(**remotedb)            mycursor = connect.cursor(dictionary=True)            #判断当前的服务器是否已经存在test数据库            mycursor.execute("show databases")            database = [mycursor.fetchall()]            # print (tables)            database_list = re.findall('(\'.*?\')', str(database))            database_list = [re.sub("'", '', each) for each in database_list]            print(database_list)            #如果存在test 数据库就直接退出            if 'test' in database_list:                print('The database of test has existed,it has deleted it,please run the job again')               

else:

#创建相关

mycursor.execute("create database test")

print('You have test database') DB_NAME = 'test' mycursor.execute("USE test".format(DB_NAME)) #建表 TABLES = {} TABLES['test'] = ( "CREATE TABLE `test1` (" " `id` int(11) NOT NULL AUTO_INCREMENT," " `content` varchar(200) NULL," " `hash` varchar(200) NULL," " `insert_date` date NULL," " PRIMARY KEY (`id`)" ") ENGINE=InnoDB") table_name = TABLES['test'] # mycursor.execute(table_name) mycursor.execute("show tables") table = [mycursor.fetchall()] # print (tables) table_list = re.findall('(\'.*?\')', str(table)) table_list = [re.sub("'", '', each) for each in table_list] print(table_list) #判断如果没有 if 'test1' in table_list: print('The table of test has existed,please delete it') else: try: #执行并开始插入数据 10000条一提交 mycursor.execute(table_name) #print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) s_time = time.time() i = 0 while i < 150000: content = self.gen_random_string() sql = "INSERT INTO test1 (content, hash,insert_date) VALUES ('%s', '%d',now())" \ % (content, hash(content)) mycursor.execute(sql) i += 1 if i % 10000 == 0: connect.commit() print(i) connect.close() print('You have test table') en_time = time.time() print(en_time-s_time) #print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) except Exception as e: print(e)

except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) finally: mycursor.close() connect.close()if __name__ == '__main__':
info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')

info.mysql_connect()



由于微信中的PYTHON 格式有问题,如需下载学习可以到QQ 群下载文件




最后修改时间:2020-06-09 09:17:18
文章转载自AustinDatabases,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论