暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分钟频、历史值…实时关联分析操作指南

908

前两期中,我们为大家展示了如何用 Window Join、Asof Join 引擎将逐笔成交数据与快照数据进行关联分析,以及使用 Left Semi Join 补充原始委托信息,这些都是金融中常见的应用场景,感兴趣的小伙伴可以点击下方跳转阅读。

本期,我们将为大家介绍剩下的三个场景:
- 实时计算股票与某指数的分钟收益率相关性;
- 对多个数据源降频采样,计算分钟指标并将结果关联到同一张表中;
- 根据快照数据实时匹配股票历史日频指标。


Equal Join

不同数据源的

分钟指标实时合并

在量化实盘中,往往需要对原始的行情快照、逐笔成交数据进行降采样处理,以得到分钟频指标,作为策略研发的输入数据,这就要求将多个不同数据源计算出的指标关联到同一张表中
本例将对快照和成交数据分别做实时的 1 分钟聚合,并将计算所得指标关联后输出到同一张宽表中。
这个场景的特征是,每支股票的快照和逐笔分钟指标在每一分钟只有一条记录,具有唯一性,并且在某一分钟的输出上,期望总是在两类指标都计算完成后再关联输出。
用 Equal Join 引擎实现此场景的脚本如下:
    // create table
    share streamTable(1:0, `Sym`TradeTime`Side`TradeQty, [SYMBOL, TIME, INT, LONG]) as trades
    share streamTable(1:0, `UpdateTime`Sym`BuyTradeQty`SellTradeQty, [TIME, SYMBOL, LONG, LONG]) as tradesMin
    share streamTable(1:0, `Sym`Time`Bid1Price`Bid1Qty, [SYMBOL, TIME, DOUBLE, LONG]) as snapshot
    share streamTable(1:0, `UpdateTime`Sym`AvgBid1Amt, [TIME, SYMBOL, DOUBLE]) as snapshotMin
    share streamTable(1:0, `UpdateTime`Sym`AvgBid1Amt`BuyTradeQty`SellTradeQty, [TIME, SYMBOL, DOUBLE, LONG, LONG]) as output


    // create engine:
    eqJoinEngine = createEqualJoinEngine(name="EqualJoin", leftTable=tradesMin, rightTable=snapshotMin, outputTable=output, metrics=<[AvgBid1Amt, BuyTradeQty, SellTradeQty]>, matchingColumn=`Sym, timeColumn=`UpdateTime)
    // create engine:
    tsEngine1 = createTimeSeriesEngine(name="tradesAggr", windowSize=60000, step=60000, metrics=<[sum(iif(Side==1, 0, TradeQty)), sum(iif(Side==2, 0, TradeQty))]>, dummyTable=trades, outputTable=getLeftStream(eqJoinEngine), timeColumn=`TradeTime, keyColumn=`Sym, useSystemTime=false, fill=(0, 0))
    // create engine:
    tsEngine2 = createTimeSeriesEngine(name="snapshotAggr", windowSize=60000, step=60000, metrics=<[avg(iif(Bid1Price!=NULL, Bid1Price*Bid1Qty, 0))]>, dummyTable=snapshot, outputTable=getRightStream(eqJoinEngine), timeColumn=`Time, keyColumn=`Sym, useSystemTime=false, fill=(0.0))


    // subscribe topic
    subscribeTable(tableName="trades", actionName="minAggr", handler=tsEngine1, msgAsTable=true, offset=-1, hash=1)
    subscribeTable(tableName="snapshot", actionName="minAggr", handler=tsEngine2, msgAsTable=true, offset=-1, hash=2)

    首先用两个独立的时序聚合引擎(createTimeSeriesEngine)对原始的快照和成交数据流按数据中的时间戳做实时聚合,输出每一分钟的指标;然后通过引擎级联的方式,将两个时序聚合引擎的输出分别作为左右表注入连接引擎。

    构造数据写入作为原始输入的 2 个流数据表,先写入右表,再写入左表:
      // generate data: snapshot
      t1 = table(`A`B`A`B`A`B as Sym, 10:00:52.000+(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.6 7.6 3.6 7.6) as Bid1Price, (1000 2000 500 1500 400 1800) as Bid1Qty)
      // generate data: trade
      t2 = table(`A`A`B`A`B`B`A`B`B`A as Sym, 10:00:54.000+(1..10)*700 as TradeTime, (1 2 1 1 1 1 2 1 2 2) as Side, (1..10) * 10 as TradeQty)
      // input
      trades.append!(t2)
      snapshot.append!(t1)

      关联得到的结果表 output 如下:


      Lookup Join

      根据快照数据

      实时匹配历史日频指标

      在当日的实时计算中,有时还需要依赖历史指标,本例中我们基于行情快照数据,通过匹配股票代码关联昨日的日频指标
      这个场景的特征是,每条快照记录到达后要求立刻关联输出,如果日频数据里没有对应的股票,输出结果对应的字段为空,输出与原始输入中的每一条行情快照记录一一对应。同时,日频指标并非实时数据,而是一个以较低频率更新的有主键的离线数据集
      用 Lookup Join 引擎实现此场景的脚本如下:
        // create table
        share streamTable(1:0, `Sym`Time`Open`High`Low`Close, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as snapshot
        historicalData = table(`A`B as Sym, (0.8 0.2) as PreWeight, (3.1 7.6) as PreClose)
        share table(1:0, `Sym`Time`Open`High`Low`Close`PreWeight`PreClose, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as output


        // create engine
        lookupJoinEngine = createLookupJoinEngine(name="lookupJoin", leftTable=snapshot, rightTable=historicalData, outputTable=output, metrics=<[Time, Open, High, Low, Close, PreWeight, PreClose]>, matchingColumn=`Sym, checkTimes=10s)


        // subscribe topic
        subscribeTable(tableName="snapshot", actionName="appendLeftStream", handler=getLeftStream(lookupJoinEngine), msgAsTable=true, offset=-1)

        构造数据写入作为引擎左表输入的流数据表 snapshot:

          // generate data: snapshot
          t1 = table(`A`B`A`B`A`B as Sym, 10:00:00.000+(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.5 7.6 3.5 7.6) as Open, (3.5 7.6 3.6 7.6 3.6 7.6) as High, (3.5 7.6 3.5 7.6 3.4 7.5) as Low, (3.5 7.6 3.5 7.6 3.6 7.5) as Close)
          snapshot.append!(t1)

          输入数据与关联关系如下:

          结果在左表数据到达引擎时立刻输出,关联得到的结果表 output 如下:


          Left Semi Join

          实时计算股票与某指数的

          分钟收益率相关性

          Left Semi Join 引擎的连接机制类似于 SQL 中的 equal join ,按连接列等值关联左右表。上一期我们利用这一功能实现了在成交数据的基础上匹配委托订单,丰富原始的委托信息。本例中,我们来实时计算股票和某个指数在过去一段时间内分钟收益率的相关性
          这个场景的特征是,两个数据流的时间戳频率一致,全部股票都需要关联同一支指数,输入是已经降为分钟频率的股票和指数数据,输出与原始输入中的股票数据一一对应
          可以用如下脚本实现关联:
            // create table
            share streamTable(1:0, `Sym`Time`Close, [SYMBOL, TIME, DOUBLE]) as stockKline
            share streamTable(1:0, `Sym`Time`Close, [SYMBOL, TIME, DOUBLE]) as indexKline
            share streamTable(1:0, `Time`Sym`Close`Index1Close, [TIME, SYMBOL, DOUBLE, DOUBLE]) as stockKlineAddIndex1
            share streamTable(1:0, `Sym`Time`Close`Index1Close`Index1Corr, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE]) as output


            // create engine: calculate correlation
            rsEngine = createReactiveStateEngine(name="calCorr", dummyTable=stockKlineAddIndex1, outputTable=output, metrics=[<Time>, <Close>, <Index1Close>, <mcorr(ratios(Close)-1, ratios(Index1Close)-1, 3)>], keyColumn="Sym")


            // create engine: left join Index1
            ljEngine1 = createLeftSemiJoinEngine(name="leftJoinIndex1", leftTable=stockKline, rightTable=indexKline, outputTable=getStreamEngine("calCorr"), metrics=<[Sym, Close, indexKline.Close]>, matchingColumn=`Time)


            // subscribe topic
            def appendIndex(engineName, indexName, msg){
            tmp = select * from msg where Sym = indexName
            getRightStream(getStreamEngine(engineName)).append!(tmp)
            }
            subscribeTable(tableName="indexKline", actionName="appendIndex1", handler=appendIndex{"leftJoinIndex1", "idx1"}, msgAsTable=true, offset=-1, hash=1)
            subscribeTable(tableName="stockKline", actionName="appendStock", handler=getLeftStream(ljEngine1), msgAsTable=true, offset=-1, hash=0)

            这里连接引擎的输出会直接注入响应式状态引擎进行下一步计算,多个引擎之间采用了引擎级联的方式处理。

            构造数据写入作为原始输入的 2 个流数据表:
              // generate data: stock Kline
              t1 = table(`A`B`A`B`A`B`A`B`A`B as Sym, 10:00:00.000+(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (4.1 7.6 3.8 7.6 4.3 7.5 3.5 7.6 4.2 7.6) as Close)
              // generate data: index Kline
              t2 = table(`idx1`idx2`idx1`idx2`idx1`idx2`idx1`idx2`idx1`idx2 as Sym, 10:00:00.000+(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (2.1 5 2.2 5 1.9 5 1.7 5 1.7 5) as Close)
              // input data
              indexKline.append!(t2)
              stockKline.append!(t1)

              输入数据与关联关系如下:

              关联得到的结果表 output 如下:

              至此,我们已经为大家展示了所有六个应用场景,囊括了 Asof Join、Window Join、Equal Join、Lookup Join、Left Semi Join 这五个流数据连接引擎。
              💡完整的流数据连接引擎教程已发布在知乎,文章详细介绍了每个引擎的实现原理,以及各个场景下的参数配置,感兴趣的小伙伴可以点击阅读原文查看。
              这些引擎均内置实现了高效的关联计算实时触发规则内存管理机制,很好地解决了对齐难、触发难、缓存难、计算难等一系列问题。开发人员通过简单的引擎参数配置,便能快速实现复杂的实时关联需求。
              在这些连接引擎的基础上,再结合 DolphinDB 流数据框架中其他流计算引擎流水线处理并行计算等重要特性,开发人员便可以高效实现业务场景实时化,掌握更及时的信息、挖掘更多的业务价值。
              五大连接引擎的信息对比如下表所示:

              💡大家还有哪些关心的话题,

              💡或想了解的解决方案呢?

              💡欢迎在评论区或公众号后台积极留言~

              Explore More



              扫描二维码,添加 DolphinDB小助手
              点击阅读原文,查看完整教程

              文章转载自DolphinDB智臾科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论