暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python数据核对系列之4-数据插入数据库的4种姿势

龙小马的数字化之路 2021-08-03
3215
在前面的文章中我们介绍了如何通过python连接数据库然后读取到我们需要的数据,然后放入pandas的DataFrame中进行数据清洗处理。那么如果处理完成的数据再能插入回数据库中,那么就能形成完美闭环了!
本文我们将介绍下如何使用python来完成数据的插入数据库操作。

1.to_sql一步到位


前面我们讲解pandas获取数据库数据的时候提到过利用pandas的read_sql来读取数据库数据,相应的这个方法有个对应插入数据库的操作to_sql。
    import pandas as pd
    import sqlalchemy


    user = 'user' # 数据库用户名
    password = 'password' # 数据库密码
    ip = '192.168.0.1:1521/orcl' # 数据库地址
    conn = sqlalchemy.create_engine('oracle+cx_oracle://'+user+':'+password+'@'+ip)
    sql = '''
      select * from table
    '''
    df = pd.read_sql(sql,conn) 
    df.to_sql('tablename',conn,if_exists='append',index=False)
    上面一段代码就可以帮我们完成读取数据到插入数据的整个过程。
    我们先来说说sqlalchemy这个东东。
      pip install sqlalchemy # 安装sqlalchemy库
      https://docs.sqlalchemy.org/en/13/
      这是sqlalchemy的官方文档。它是Python 中一个通过 ORM (Object Relational Mapper,对象关系映射)操作数据库的框架。上面,我们只用到了它的创建数据库连接功能。通过下面的截图也可以了解到,sqlalchemy可以跟主流数据库都可以建立连接。

      sqlalchemy的create_engine可以完成跟数据库建立连接的动作。不同数据库使用不同的数据库引擎。
        engine = create_engine("postgresql+psycopg2://user:password@host/dbname")
        engine = create_engine("mysql+pymysql://scott:tiger@localhost/test")
        # create_engine的第一个参数的组成内容
        dialect[+driver]://user:password@host/dbname
        ps:在to_sql的调用上,我尝试直接使用cx_Oracle创建的连接对象作为to_sql的参数,但是执行失败了。在pandas的官方文档里介绍to_sql的时候也默认使用的连接数据库的工具也是sqlalchemy。

          DataFrame.to_sql(name,  # 要插入数据的表名称
                           con,  # 数据库连接对象
                           schema=None# 数据库模式,具体看数据库
                           if_exists='fail',  # 如果插入数据的表已经存在,那么,fail:返回失败;append:追加到原表;replace:替换原表数据
                           index=True,  # 是否插入DataFrame的index列的数据
                           index_label=None# index列数据插入后的列名称
                           chunksize=None,  # 每次执行插入操作的数据条数。100:每次插入100条。None:一次性插入所有数据
                           dtype=None# 插入数据的数据类型
                           method=None# 插入操作。{None,multi,callable} None:标准插入操作;multi:为单个insert操作指定多行值;callable:callable with signature (pd_table, conn, keys, data_iter).callable暂时没搞懂
          我们来看下不同chunksize的插入性能。

          1300条10列数据插入时间竟然需要3分半,这速度是不能让人满意的!不过这个方案的好处是,代码真简洁啊!



          2.insert第一版


          从上面的表现来看,to_sql()的插入数据性能并不能让人满意。毕竟如果随随便便就百万数据的话,这速度得累死人。
          那么我们来看看如果使用sql里的insert语句性能会如何。
            import cx_Oracle
            user = 'user'
            password = '123456'
            ip = '192.168.0.1:1521/orcl'
            conn = cx_Oracle.connect(user,password,ip) # 创建数据库连接对象
            cur = conn.cursor() # 创建cursor游标
            for row in df1.itertuples(index=False): # 对DataFrame进行遍历,读取数据
            data = row[:]
            isnert_sql = '''
            insert into DIM_CITY_PYTHON (CCITYCODE,CCITYNAME,CCODE,CNAME,PARTCODE,PARTNAME)
            values('%s','%s','%s','%s','%s','%s')
            '''%data
            # print(isnert_sql)
            try:
            cur.execute(isnert_sql)
            conn.commit()
            except:
            conn.rollback()

            从结果来看,我们执行了7次循环,每次循环执行1次,平均执行时间66秒,比to_sql()的3分钟已经快了很多了。
            我们来拆解下代码。
            获取数据库连接和创建cursor对象我们就不赘述了。
            咱们先来聊聊如何遍历DataFrame数据。

            在pandas的文档中第3部分就是介绍重要的基础功能的,而DataFrame的遍历操作就在其中的iteration部分。
            获取到DataFrame之后,我们可以通过以下代码遍历它的列名。
              for col in df:
                 print(col)
              通过items()我们可以遍历DataFrame的列名和列值
                for label,values in df.items():
                print(label,values)
                如果想要遍历DataFrame的每一行数据,可以通过iterrows(),itertuples()来完成。
                iterrows:它以(索引,序列)对的形式遍历DataFrame的每一行。可以将行转换为序列,会更改数据类型而且对性能有一定影响。

                itertuples():它遍历DataFrame的行获得列名、列值形式的命名元组。这个方法因为把数据存储在元组中,因此会保留数据的类型,并且比iterrows()方法性能快很多。大多数情况下适合用itertuples()来迭代DataFrame的行数据。如果itertuples()未设置index=False,那么得到的结果的第一个元素将是索引值;如果itertuples(index=False)那么得到的结果将从索引后面的列开始。

                从上面的截图就可以清晰的看到itertuples()遍历得到的结果是什么。我们可以通过a[2]的形式来获得遍历得到的行数据的元组内的某列的值。还可以通过a[:]的方式获取整行数据的tuple元组。
                我们获取DataFrame每行数据就可以通过insert语句将数据插入到对应的数据库的表中了。

                3.insert第二版


                上面我们的插入方案是每次插入一条数据。如果我们能每次插入多条数据的话,性能上有什么提升呢?
                一次性插入多条数据是通过以下sql来完成的
                  insert into table (a,b,c,d……) --字段
                  select values1,values2,values3…… from dual --要插入的值。from dual 是oracle的写法。
                  union all
                  select values1,values2,values3…… from dual
                  union all
                  select values1,values2,values3…… from dual
                  ……
                  通过上面这段SQL我们可以一次性将多条数据插入数据库中,减少插入操作。具体代码如下
                    %%time # jupyter notebook的魔法函数,可以查看代码块的执行时间
                    cur = conn.cursor() # 创建cursor游标
                    sql = 'insert into DIM_CITY_PYTHON (CCITYCODE,CCITYNAME,CCODE,CNAME,PARTCODE,PARTNAME,SAPCOMPANYNAME_ORDER,SAPCOMPANYCODE,CREATE_TIME,GDP)'
                    i = 0 # 设置起始值
                    a = 1000 # 设置每次插入的数据条数
                    b = len(df)-1
                    for row in df.itertuples(): # 对DataFrame进行遍历,读取数据
                        data = row[1:] # 获取遍历DataFrame中的要插入的字段值。row[0]是索引值。data是个tuple,所以下面的sql代码后面直接使用的%data
                    if i < a and row[0] < b:
                            sql = sql + "select '%s','%s','%s','%s','%s','%s','%s','%s',to_date('%s','yyyy-mm-dd HH24:mi:ss'),%f from dual \n" %data + 'union all \n'
                    else:
                    sql = sql + "select '%s','%s','%s','%s','%s','%s','%s','%s',to_date('%s','yyyy-mm-dd HH24:mi:ss'),%f from dual"%data
                    try:
                    cur.execute(sql)
                    conn.commit()
                    except:
                    conn.rollback()
                    sql = 'insert into DIM_CITY_PYTHON (CCITYCODE,CCITYNAME,CCODE,CNAME,PARTCODE,PARTNAME,SAPCOMPANYNAME_ORDER,SAPCOMPANYCODE,CREATE_TIME,GDP)'
                    i = 0
                    i += 1
                    print('插入完成!')
                    执行结果如下图

                    我们可以看到一次插入1000条时,整个执行时间是280ms左右。不到1秒时间就搞定了。
                    从开始的3分钟到1分钟再到现在的不到1秒,速度上已经飞跃提升了。
                    如果我们把每次插入的数据量提升到10000条,可以看到时间可以压缩到190ms左右


                    4.insert第三版-万能插入代码


                    我们已经将插入速度提升到毫秒级了,那么我们还能继续提升这个速度吗?
                    我们还有一种方法可以完成数据的快速插入。参考代码如下
                      # oracle数据库的万能插入数据代码。只要给出的插入数据字段能跟数据库表对应上。
                      def get_conn():
                      user = 'user'
                      pwd = '123456'
                      ip = 'localhost:1521/orcl'
                      conn = cx_Oracle.SessionPool(user,pwd,ip,min=2,max=6).acquire()
                      return conn


                      def insert_data(name,data): # name:插入的表名称;data:要插入的数据的DataFrame格式
                      with get_conn() as conn: # get_conn() 获取数据库连接对象的函数
                      cur = conn.cursor()
                      columns = data.columns # 获取要出入数据的列名称
                      tmp = ''
                      fields = ''
                      length_columns = len(columns)
                      for i in range(length_columns): # 获取要插入的列的数量及插入代码
                      tmp = tmp + ':' + str(i+1)
                      fields = fields + columns[i]
                      if i < length_columns-1:
                      tmp += ','
                      fields += ','


                              insert_sql = ''' # 要执行插入操作的SQL
                      insert into %s (%s)
                      values(%s)
                      '''%(name,fields,tmp)


                      insert_data = []
                              for rows in data.itertuples(): # 遍历要插入的数据DataFrame
                                  row = rows[1:] # 获取需要插入的数据。注意:rows[0]是索引。
                      insert_data.append(row)
                                  if (rows[0]+1) % 10000 == 0 or rows[0] == len(data)-1# 每次插入10000条。跟上面代码不同的是,我们这里直接使用索引来判断每次插入的数量和是否达到最后一行数据。
                      cur.executemany(insert_sql,insert_data)
                      conn.commit()
                      insert_data = []
                      上面这段代码我们使用的是cursor的executemany方法。
                      按照cx_Oracle官方文档的说明。使用cursor.executemany()处理大数据量的插入、更新操作,在性能上要比重复调用cursor.execute()更有效的多。cursor.executemany可以降低网路传输成本和数据库的开销。其实这个原理跟我们上面的处理方式一致,都是通过一次插入大量数据来减少网络传输次数和数据库开销,以此来提升速度。因为这个发放跟上面的在执行时间上基本一样。但它优胜在代码更加简洁,而且适应性更强。
                      使用cursor.executemany()来操作大数据量(不仅是行数多,也跟列数有关)时,也需要注意缓存量限制和网络限制。尽量将每次操作的数据量控制在缓存、网络传输的最佳性能上。

                      Cursor.executemany(sql,插入数据)
                      sql的写法
                        # oracle
                        insert into table_name (字段1,字段2,字段3……)
                        values (:1,:2,:3……) # 注意这里的数字是从1开始的,结尾的数字要跟上面的字段数量对应上


                        # sqlserver
                        insert into table_name(字段1,字段2,字段3……)
                        values (%s,%d,%s……) # 这里的s/d要跟上面字段的数据类型对应上

                        下图是sqlserver的示例代码

                        要插入的数据格式:

                        它是一个列表数据,列表内部存储的是每行数据的元组。

                          data = [
                          ( 110, "Parent 110"),
                          ( 2000, "Parent 2000"),
                          ( 30000, "Parent 30000"),
                          ( 400000, "Parent 400000"),
                          (5000000, "Parent 5000000")
                          ]
                          cursor.executemany(""" insert into ParentTable (ParentId, Description) values (:1, :2)""", data)

                          而当我们使用itertuples()方法对DataFrame进行遍历时,刚好得到的结果就是元组类型的每行数据。因此我们获取它的遍历结果放入一个空列表中,就能得到executemany()方法所需的data了。


                          人生苦短,我用Python!

                          一切皆是信息,万物源自比特!数字化必定会深刻革命我们的办公和生活!

                          简道云,中小企业数字化之路的绝佳伴侣!

                          Python,把自己从低效、重复的消耗中解放出来!


                          本公众号将分享数字化的实践、学习、思考。也许涉及信息化系统设计、各种办公软件、数据分析、理论知识、实践案例…… 感谢你与我一同成长……

                          如果有关企业数字化的疑问、思考和讨论 或者 关于简道云的应用搭建、数据工厂、仪表盘等疑问咨询或者合作,欢迎与我联系。关注公众号,可以找到我的联系方式

                          文章转载自龙小马的数字化之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论