暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何用 DolphinDB + Kafka 实时计算K线

801

DolphinDB Kafka 插件支持把 DolphinDB 中生产的数据推送到 Kafka,也支持从 Kafka 订阅数据,并在 DolphinDB 中消费。本文将为大家展示如何通过该插件实时计算k线。DolphinDB Kafka 插件的完整实践指南已发布在官方知乎,可点击阅读原文查看。

DolphinDB Kafka 插件

Kafka 是一个高吞吐量的分布式消息中间件,可用于海量消息的发布和订阅。
当面对大量的数据写入时,以消息中间件接收数据,然后再批量写入到时序数据库中,这样可以将消息中间件的高并发能力时序数据库的高吞吐量联合起来,更好地解决海量数据的实时处理和存储问题
DolphinDB Kafka 插件目前支持以下数据类型的序列化和反序列化:
  • DolphinDB 标量
  • Kafka Java API 的内置类型:String(UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte[] 以及 ByteBuffer
  • 以上数据类型所组成的向量
用户可以在 DolphinDB 中实例化 Producer 对象,把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。用户也可以在 DolphinDB 中实例化 Consumer 对象,将 Kafka 中指定 Topic 的数据同步到 DolphinDB。
DolphinDB Kafka 插件的基本使用教程,包括如何安装插件、如何使用 Producer、如何使用 Consumer,请点击阅读原文查看。

案例:如何实时计算K线

本节将展示如何使用“DolphinDB+Kafka”实时计算K线。
1. 环境准备
  • 部署 DolphinDB 集群,版本为v2.00.7

  • 部署 Kafka 集群,版本为 2.13-3.1.0

详细部署教程可点击阅读原文查看。

2. 生产数据
配置好相关环境后,我们首先通过 DolphinDB 的 replay 历史数据回放工具和 Kafka 插件,把逐笔成交数据实时发送到 Kafka 中。主要包含三个步骤:
  • Kafka 创建 Topic
    ./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092
    Created topic topic-message.
    • 加载 Kafka 插件并创建 Kafka Producer
      // 加载插件
      path = "/DolphinDB/server/plugins/kafka"
      loadPlugin(path + "/PluginKafka.txt")
      loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");


      // 定义创建 Kafka Producer 的函数
      def initKafkaProducerFunc(metadataBrokerList){
      producerCfg = dict(STRING, ANY)
      producerCfg["metadata.broker.list"] = metadataBrokerList
      return kafka::producer(producerCfg)
      }


      // 创建 Kafka Producer 并返回句柄
      producer = initKafkaProducerFunc("192.193.168.5:8992")
      • 推送数据到 Kafka Topic
        // 定义推送数据到 KafKa "topic-message" Topic 的函数
        def sendMsgToKafkaFunc(dataType, producer, msg){
        startTime = now()
        try {
        for(i in msg){
        kafka::produce(producer, "topic-message", 1, i, true)
        }
        cost = now() - startTime
        writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")
        }
        catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}
        }


        // 创建 DolphinDB 流数据表 tickStream
        colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
        colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
        share(streamTable(35000000:0, colName, colType), `tickStream)


        // 订阅 tickStream,处理函数是 sendMsgToKafkaFunc
        subscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc{`tick, producer}, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)
        getHomeDir()
        // 控速回放 DolphinDB 分布式中的历史数据至 tickStream
        dbName = "dfs://SH_TSDB_tick"
        tbName = "tick"
        replayDay = 2021.12.08
        testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityID
        submitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)
        可以使用 kafka-console-consumer 命令行工具消费 Topic 为 topic-message
         中的数据,验证数据是否成功写入:
          ./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message
          3. 消费数据
          接下来为大家展示如何在 DolphinDB 中订阅消费 Kafka Topic 中的数据,以将其实时同步到流数据表中。
          • 创建消费者,订阅主题数据
            // 创建 Kafka Consumer 并返回句柄
            consumerCfg = dict(STRING, ANY)
            consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"
            consumerCfg["group.id"] = "topic-message"
            consumer = kafka::consumer(consumerCfg)


            // 订阅 Kafka 中的 "topic-message" 主题的数据
            topics = ["topic-message"]
            kafka::subscribe(consumer, topics);
            • DolphinDB 订阅 Kafka 消息队列中的数据
              // 订阅 Kafka 发布消息,写入流表 tickStream_kafka
              colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
              colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
              share(streamTable(35000000:0, colName, colType), `tickStreamkafka)
              go


              // Kafka 消息解析函数
              def parse(mutable dictVar, mutable tickStreamkafka){
              try{
              t = dictVar
              t.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))
              tickStreamkafka.append!(t);
              }catch(ex){
              print("kafka errors : " + ex)
              }
              }


              colType[1] = STRING;
              decoder = EncoderDecoder::jsonDecoder(colName, colType, parse{, tickStreamkafka}, 15, 100000, 0.5)


              // 创建 subjob 函数
              conn = kafka::createSubJob(consumer, , decoder, "topic-message")
              4. 流计算引擎实时计算K线

              我们使用 DolphinDB 内置流计算引擎计算分钟 K 线,并将结果输出到名为 OHLCVwap
              的结果表中。DolphinDB GUI 中执行以下脚本:
                // 创建接收实时计算结果的流数据表
                colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Vwap
                colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
                share(streamTable(2000000:0, colName, colType), `OHLCStream)


                // K 线指标计算元表达式
                aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>


                // 创建引擎并将 kafka 中订阅的数据注入流计算引擎
                createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)
                subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVwap"), msgAsTable=true, batchSize=1000, throttle=1, hash=0)
                • 设置参数 offset 为 - 1,订阅将会从提交订阅时流数据表的当前行开始。
                • 设置 useSystemTime=true,表示时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。
                本文展示了如何用 DolphinDB Kafka Plugin 中常用的接口函数实时计算K线,更多函数支持可参考官网文档,完整案例教程和脚本可点击文末阅读原文查看。使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈。

                Explore More



                扫描二维码,添加 DolphinDB小助手
                点击阅读原文,查看完整案例和脚本

                文章转载自DolphinDB智臾科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论