DolphinDB Kafka 插件支持把 DolphinDB 中生产的数据推送到 Kafka,也支持从 Kafka 订阅数据,并在 DolphinDB 中消费。本文将为大家展示如何通过该插件实时计算k线。DolphinDB Kafka 插件的完整实践指南已发布在官方知乎,可点击阅读原文查看。
DolphinDB Kafka 插件
DolphinDB 标量 Kafka Java API 的内置类型:String(UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte[] 以及 ByteBuffer 以上数据类型所组成的向量

案例:如何实时计算K线
部署 DolphinDB 集群,版本为
v2.00.7
。部署 Kafka 集群,版本为
2.13-3.1.0
。
详细部署教程可点击阅读原文查看。
Kafka 创建 Topic
./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092Created topic topic-message.
加载 Kafka 插件并创建 Kafka Producer
// 加载插件path = "/DolphinDB/server/plugins/kafka"loadPlugin(path + "/PluginKafka.txt")loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");// 定义创建 Kafka Producer 的函数def initKafkaProducerFunc(metadataBrokerList){producerCfg = dict(STRING, ANY)producerCfg["metadata.broker.list"] = metadataBrokerListreturn kafka::producer(producerCfg)}// 创建 Kafka Producer 并返回句柄producer = initKafkaProducerFunc("192.193.168.5:8992")
推送数据到 Kafka Topic
// 定义推送数据到 KafKa "topic-message" Topic 的函数def sendMsgToKafkaFunc(dataType, producer, msg){startTime = now()try {for(i in msg){kafka::produce(producer, "topic-message", 1, i, true)}cost = now() - startTimewriteLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")}catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}}// 创建 DolphinDB 流数据表 tickStreamcolName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndexcolType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]share(streamTable(35000000:0, colName, colType), `tickStream)// 订阅 tickStream,处理函数是 sendMsgToKafkaFuncsubscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc{`tick, producer}, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)getHomeDir()// 控速回放 DolphinDB 分布式中的历史数据至 tickStreamdbName = "dfs://SH_TSDB_tick"tbName = "tick"replayDay = 2021.12.08testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityIDsubmitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)
topic-message中的数据,验证数据是否成功写入:
./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message
创建消费者,订阅主题数据
// 创建 Kafka Consumer 并返回句柄consumerCfg = dict(STRING, ANY)consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"consumerCfg["group.id"] = "topic-message"consumer = kafka::consumer(consumerCfg)// 订阅 Kafka 中的 "topic-message" 主题的数据topics = ["topic-message"]kafka::subscribe(consumer, topics);
DolphinDB 订阅 Kafka 消息队列中的数据
// 订阅 Kafka 发布消息,写入流表 tickStream_kafkacolName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndexcolType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]share(streamTable(35000000:0, colName, colType), `tickStreamkafka)go// Kafka 消息解析函数def parse(mutable dictVar, mutable tickStreamkafka){try{t = dictVart.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))tickStreamkafka.append!(t);}catch(ex){print("kafka errors : " + ex)}}colType[1] = STRING;decoder = EncoderDecoder::jsonDecoder(colName, colType, parse{, tickStreamkafka}, 15, 100000, 0.5)// 创建 subjob 函数conn = kafka::createSubJob(consumer, , decoder, "topic-message")
OHLCVwap的结果表中。DolphinDB GUI 中执行以下脚本:
// 创建接收实时计算结果的流数据表colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`VwapcolType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]share(streamTable(2000000:0, colName, colType), `OHLCStream)// K 线指标计算元表达式aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>// 创建引擎并将 kafka 中订阅的数据注入流计算引擎createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVwap"), msgAsTable=true, batchSize=1000, throttle=1, hash=0)
设置参数 offset 为 - 1,订阅将会从提交订阅时流数据表的当前行开始。 设置 useSystemTime=true,表示时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。
Explore More

文章转载自DolphinDB智臾科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。







