暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分享一个用Python2.7编写的mssql数据库操作类脚本

SQLServer走起 2020-01-27
1314

这个脚本需要先安装Python第三方包:pymssql

安装命令

    pip install pymssql

    这个脚本需要依赖几个辅助脚本

    except_helper.py:打印程序的一些堆栈信息

      #!/usr/bin/env python
      # coding=utf-8


      import os
      import sys


      def detailtrace():
      """获取程序当前运行的堆栈信息"""
      retStr = ""
      f = sys._getframe()
      f = f.f_back # first frame is detailtrace, ignore it
      while hasattr(f, "f_code"):
      co = f.f_code
      retStr = "%s(%s:%s)->"%(os.path.basename(co.co_filename),
      co.co_name,
      f.f_lineno) + retStr
      f = f.f_back
      return retStr

      log_helper.py:记录程序出错日志

        #!/usr/bin/env python
        # coding=utf-8


        from __future__ import unicode_literals
        import logging
        import os
        import traceback


        from common import except_helper




        # 生成一个以当前文件名为名字的logger实例
        logger = logging.getLogger(__name__)








        def info(content):
        """记录日志信息"""
        if content:
        logger.info(content)


        def error(content = '', is_send_mail = True):
        """记录错误日志信息"""
        if traceback:
        content = content + '\n' + traceback.format_exc() + '\n'
        # 获取程序当前运行的堆栈信息
        detailtrace = except_helper.detailtrace()


        content = content + '程序调用堆栈的日志:' + detailtrace + '\n'
        logger.info(content)


        config_helper.py:读取配置文件里面的数据库信息

          #!/usr/bin/env python
          # -*- coding:utf-8 -*-
          # @File : config_helper.py
          # @Description :读取配置文件配置




          from __future__ import unicode_literals
          import os
          import codecs


          import configparser




          def get_config(args):
          '''
          读取配置文件
          :param args:
          :return:
          '''
          dirs = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
          config = configparser.ConfigParser(allow_no_value=True)
          dict_config = {}
          with codecs.open(dirs + '/client.conf', 'r') as cfgfile:
          config.read_file(cfgfile)
          if config.has_section('mssqldb'):
          dict_config['dbhost'] = config.get('mssqldb', 'dbhost')
          dict_config['dbport'] = config.get('mssqldb', 'dbport')
          dict_config['dbname'] = config.get('mssqldb', 'dbname')
          dict_config['dbuser'] = config.get('mssqldb', 'dbuser')
          dict_config['dbpassword'] = config.get('mssqldb', 'dbpassword')


          # 根据传入参数返回变量以获取配置,返回变量名与参数名相同
          if args and dict_config.has_key(args):
          return dict_config[args]
          else:
          return ''


          配置文件名:client.conf,而配置文件大概长这样

            [mssqldb]
            dbhost = 192.168.1.16
            dbport = 1433
            dbname = master
            dbuser = sa
            dbpassword = 123456

            mssqldb_helper.py:这篇文章的核心,读写数据库的数据

              #!/usr/bin/env python
              # coding=utf-8


              from __future__ import unicode_literals
              import time


              import pymssql


              from common import log_helper
              from common.config_helper import get_config




              # 初始化数据库参数
              DB_NAME = get_config('dbname')
              DB_HOST = get_config('dbhost')
              DB_PORT = get_config('dbport')
              DB_USER = get_config('dbuser')
              DB_PASS = get_config('dbpassword')
              # db_charset = 'utf-8'






              class MSSQLHelper(object):
              """MSSQL数据库操作类"""


              def __init__(self, is_output_sql=False):
              self.connect = None
              self.cursor = None
              # 是否将所有要执行的Sql语句输出到日志里
              self.is_output_sql = is_output_sql


              def open_conn(self):
              """连接数据库,并建立游标"""
              try:
              if not self.connect:
              self.connect = pymssql.connect(database=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST, port=DB_PORT)
              self.connect.autocommit(True)
              return self.connect
              except Exception as e:
              args = str(e.args)
              log_helper.error('连接数据库失败:' + args)
              return False


              def close_conn(self):
              """关闭数据库链接"""
              # 关闭游标
              try:
              if self.cursor:
              self.cursor.close()
              except Exception:
              pass
              # 关闭数据库链接
              try:
              if self.connect:
              self.connect.close()
              except Exception:
              pass


              def __enter__(self):
              """初始化数据库链接"""
              self.open_conn()
              return self # 返回类实例 with形式


              def __exit__(self, type, value, trace):
              """关闭数据库链接"""
              self.close_conn()


              def rollback(self):
              """回滚操作"""
              try:
              # 异常时,进行回滚操作
              if self.connect:
              self.connect.rollback()
              except Exception as e:
              args = str(e.args)
              log_helper.error('回滚操作失败:' + args)




              def read(self,sql):
              """
              连接mssql数据库并进行数据查询
              如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false
              如果所有执行正常,则返回查询到的数据,这个数据是经过转换的,转成字典格式,方便模板调用,其中字典的key是数据表里的字段名
              """
              if not sql:
              return None
              # 记录程序执行开始时间
              start_time = time.time()
              try:
              # 建立游标
              self.cursor = self.connect.cursor(as_dict=True)
              # 执行查询操作
              self.cursor.execute(sql)
              except Exception as e:
              args = str(e.args)
              # 将异常写入到日志中
              log_helper.error('sql执行失败:' + args + ' query:' + str(sql))
              self.data = None
              else:
              # 获取数据
              try:
              if self.cursor.description:
              # 将返回的结果转换成字典格式
              self.data = self.cursor.fetchall()
              else:
              if self.cursor.rowcount < 0:
              self.data = 0
              else:
              self.data = self.cursor.rowcount
              except Exception as e:
              args = str(e.args)
              if not args.find('duplicate key value violates unique '):
              # 将异常写入到日志中
              log_helper.error('数据获取失败:' + args + ' query:' + str(sql))
              self.data = None
              finally:
              # 关闭游标
              self.cursor.close()


              # 记录程序执行结束时间
              end_time = time.time()
              # 判断是否记录sql执行语句
              if self.is_output_sql:
              self.write_log(start_time, end_time, sql)


              # 返回结果(字典格式)
              return self.data




              def write(self,sql):
              """
              连接mssql数据库并进行写的操作
              如果连接失败,会把错误写入日志中,并返回false,如果sql执行失败,也会把错误写入日志中,并返回false,如果所有执行正常,则返回true
              """
              if not sql:
              return None
              # 记录程序执行开始时间
              start_time = time.time()
              try:
              # 建立游标
              self.cursor = self.connect.cursor(as_dict=True)
              # 执行查询操作
              self.cursor.execute(sql)
              except Exception as e:
              args = str(e.args)
              # 如果出错,则事务回滚
              self.rollback()
              # 将异常写入到日志中
              log_helper.error('sql执行失败:' + args + ' query:' + str(sql))
              self.data = None
              else:
              # 获取数据
              try:
              if self.cursor.description:
              # 将返回的结果转换成字典格式
              self.data = self.cursor.fetchall()
              else:
              self.data = self.cursor.rowcount
              except Exception as e:
              args = str(e.args)
              if not args.find('duplicate key value violates unique '):
              # 将异常写入到日志中
              log_helper.error('数据获取失败:' + args + ' query:' + str(sql))
              self.data = None
              finally:
              # 关闭游标
              self.cursor.close()


              # 记录程序执行结束时间
              end_time = time.time()
              # 判断是否记录sql执行语句
              if self.is_output_sql:
              self.write_log(start_time, end_time, sql)


              # 返回结果(字典格式)
              return self.data




              def write_log(self, start_time, end_time, sql):
              """记录Sql执行超时日志"""
              t = end_time - start_time
              if (t) > 0.1:
              content = ' '.join(('run time:', str(t), 's sql:', sql))
              log_helper.info(content)







              一整套的mssql数据库操作都已经封装好了,大家可以直接拿去用

              因为这些都是公共类库,所以都放在了common这个文件夹下,大家可以按照自己的习惯去分类整理这些公用类库文件



              最近学习到“时间复利”这个词,一般用在投资上,其实走起君觉得这个词也可以用在看书上,每天利用通勤时间,看几页书本,一年下来就可以看好几本书,积累很多知识,因为看书学习也是一个循序渐进的过程


              最近肺炎疫情严重,走起君也希望大家 身体健康,平平安安!





              最后修改时间:2020-01-27 14:34:29
              文章转载自SQLServer走起,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论