暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

金融数据导入手册之:逐笔数据篇

1134

将历史数据导入数据库是进行数据查询、计算和分析的基础。我们搜集了用户在数据导入实操中常见的各类问题,以 CSV 格式的文件为例,为大家整理了金融逐笔数据导入的完整操作步骤。

本文将概述数据导入前的准备工作和导入的核心步骤。完整教程已发布在官方知乎,可点击阅读原文查看。

准备工作示例

数据导入的准备工作主要包含三个方面:
(1)数据源分析:从 DolphinDB 数据类型兼容性的角度分析数据源,选择满足建库建表要求的方案。
(2)规划存储方案:分析表连接需求,选择适合的存储方案。当没有表连接分析的需求时,推荐单库单表存储数据;当有表连接需求时,推荐一库多表存储数据。
(3)规划分区:对于 level2 逐笔数据的场景,我们推荐复合分区,先按日期做值分区,再按股票代码做 HASH 分区。
我们以上市委托数据 CSV 文件为例,使用 Linux 系统的 head 命令打开该文件:
可以看到,这个 CSV 文件有如下特点:
  • 第一行是文件说明,后续各种读取都需要跳过这一行;
  • 从第二行开始是数据,没有列名,在建表时需要根据数据的说明文档定义字段名称和字段类型。
从左至右的字段名根据上市的说明文档定义为:SecurityID, TransactTime, valOrderNoue, Price, Balance, OrderBSFlag, OrdType, OrderIndex, ChannelNo, BizIndex。
其中 SecurityID, OrderBSFlag 和 OrdType 为重复较多的有限数量的字符串,使用 SYMBOL 类型;TransactTime 为从年到毫秒的日期,使用 TIMESTAMP 数据类型;其它的字段没有特殊之处,整数用 INT,浮点数用 DOUBLE。
所以,从左至右存储字段的数据类型定义为:SYMBOL, TIMESTAMP, INT, DOUBLE, INT, SYMBOL, SYMBOL, INT, INT, INT。
本教程推荐选用 TSDB 引擎。上市每天逐笔委托数据大小在 3GB 左右,先按日期做值分区,再用股票代码做7个 HASH 分区。按日期值分区时,VALUE 的初始值写两三天的初始值即可,实际分区值会根据数据的实际日期自动扩展。

数据导入与清洗转换

DolphinDB 导入数据的核心函数是 loadTextEx,可用于 CSV 文件读取、数据清洗和入库一体化操作。导入数据核心代码如下:
    db = database("dfs://sh_entrust")
    def transType(mutable memTable)
    {
    return memTable.replaceColumn!(`col0,string(memTable.col0)).replaceColumn!(`col1,datetimeParse(string(memTable.col1),"yyyyMMddHHmmssSSS")).replaceColumn!(`col5,string(memTable.col5)).replaceColumn!(`col6,string(memTable.col6))
    }
    filePath = "/home/ychan/data/loadForPoc/SH/Order/20210104/Entrust.csv"
    loadTextEx(dbHandle = db, tableName = `entrust, partitionColumns = `col1`col0, filename = filePath, skipRows = 1,transform = transType)

    核心代码中,使用了 loadTextEx 函数,其中 transform 参数引用了 transType 函数定义,其作用是数据清洗和类型转换。

    transform 能够非常方便地完成但不限于如下需求:
    (1)转换数据类型
    例如,当传入的 SecurityID 数据类型为整型,不符合 SYMBOL 的要求而报错时,可以使用 transType 函数自定义转换数据类型,赋给 transform 参数后再执行导入语句。相关代码如下:
      def transType(mutable memTable)
      {
      return memTable.replaceColumn!(`col0,string(memTable.col0)).replaceColumn!(`col1,datetimeParse(string(memTable.col1),"yyyyMMddHHmmssSSS")).replaceColumn!(`col5,string(memTable.col5)).replaceColumn!(`col6,string(memTable.col6))
      }

      (2)在 CSV 文件的基础上增加列

      有时 CSV 文件会缺少某些列,比如缺少日期,但通过文件名给出了日期信息。这种情况下我们可以通过 transform 参数引用的函数,增加列并赋值。代码如下:
        def addCol(mutable memTable,datePara)
        {
        update memTable set date = datePara
        return memTable
        }

        (3)过滤无效数据

        当需要把 CSV 文件中的无效数据过滤掉再写入分布式表时,可以在 transform 参数引用的函数中使用 select 语句筛选出符合条件的数据。例如只写入价格大于0的数据,函数定义的代码为:
          def fliterData(mutable memTable)
          {
          return select * from memTable where price > 0
          }

          (4)转换字符编码

          为了显示正常,有时候需要把 GBK 编码的列转成 UTF-8。transform 参数引用的函数的代码为:
            def addCol(mutable memTable)
            {
            return mutable.replaceColumn!(`custname,toUTF8(mutable.custname,`gbk))
            }

            (5)导入部分列

            可通过在 transform 参数引用的函数中筛选出所需列,实现导入部分列,代码如下:
              def partCol(mutable memTable)
              {
              return select [需要的部分列名] from memTable
              }

              常见问题

              只提交了一个文件的导入,长时间执行不完,硬盘也没有写入,这是什么原因?

              因为单个 CSV 文件太大,缓存不够用。先把 OLAPCacheEngineSize 和 TSDBCacheEngineSize 两个参数的值修改为大于 CSV 文件的大小,再重启系统即可。


              执行过程中,报 out of memory 错误,怎么处理?

              1)如果使用的是社区版本 license,请联系负责支持的销售人员,获取试用版本 license。2)查看 maxMemSize 参数的配置是否远小于系统内存,建议配置为系统内存的80%。3)检查 workerNum 和 localExecutors 配置,合理配置值的计算方法为:可用内存除以单个文件大小向下取整得到 workerNum 的值,localExecutors 的值为 workerNum 减 1。


              nsf 系统导入时,报 Bad file descriptor 错误,怎么解决?

              nfs 文件需要用 v3 版本,并设置 local_lock 参数为 all 的方式进行挂载。


              数据如何去重?

              建表时指定 keepDuplicates 参数的值可以去重,提供以下选项:

              • ALL:保留所有数据

              • LAST:仅保留最新数据

              • FIRST:仅保留第一条数据


              Explore More



              扫描二维码,添加 DolphinDB小助手
              点击阅读原文,查看完整数据导入教程

              文章转载自DolphinDB智臾科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论