暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

用复杂事件处理引擎实现一个高频量价因子策略

154
流数据是一种持续实时生成且动态变化的时间序列数据,涵盖了金融交易、物联网(IoT)传感器采集、物流运营、零售订单等各类持续生成动态数据的场景。随着业务的发展和数据的增长,想要及时准确地洞察数据背后的意义,必须通过一款高性能、可靠且易用的流数据处理工具,从大规模和复杂的实时数据流中提取事件信息,并对符合预定模式的事件进行实时分析和决策。
这里为大家介绍一个专为交易策略实现交易风控交易监控等业务设计的流计算引擎—— DolphinDB CEP (复杂事件处理) Engine。CEP 引擎集合了高吞吐、低时延、流批一体的优势,又在灵活性与事件描述语言等方面进行了优化。本文将带大家了解 CEP 引擎的应用框架,再通过一个实现高频量价因子策略的案例,为大家讲解如何使用 CEP 引擎处理事件流数据。

CEP 引擎应用框架

通过这张图,我们先来简单了解一下 DolphinDB CEP 引擎是如何在业务中发挥作用的。

CEP 应用框架主要具有以下特点:
  • 灵活性与高扩展性:支持面向事件编程的 DolphinScript,提供了多种事件处理 API。
  • 强大的计算引擎:支持与流计算引擎结合,方便对实时数据进行多种复杂计算,比如聚合计算、状态计算等,提升复杂策略的计算性能。
  • 流批一体:支持将 DolphinDB 数据库中的数据回放至 CEP 引擎中,实现策略回测与批流一体。
  • 可视化和监控:支持对接 DolphinDB 提供的多种运维工具和插件,用户可实时监控 CEP 引擎运行状态。
  • 多种 API 写入:通过 DolphinDB API 或者插件,第三方客户端(如 Python 应用程序)或者其他第三方数据源(如 kafka)可以方便地将数据输入到 CEP 引擎或者订阅 CEP 引擎输出结果。
欲深入了解 CEP 引擎工作原理,请参考白皮书。前往 DolphinDB 官网,点击「开发者中心」—「白皮书」—「CEP 引擎」,即可下载获取完整资料。

初级高频量价因子策略实现

通过一个初级的事件化量价因子驱动的交易策略,我们来进一步了解 CEP 引擎如何在金融量化场景下发挥作用。

策略逻辑


我们可以基于股票逐笔成交数据,定义一个事件驱动策略,策略将根据每支股票最新的成交价涨幅和累计成交量,判断是否执行下单或撤单等操作。具体策略判断逻辑细节如下:

  • 根据每一笔成交数据触发计算两个实时因子:最新成交价相对于15秒内最低成交价的涨幅 (变量 ROC)和过去1分钟的累计成交量 (变量 volume );
  • 在策略启动时设定每支股票的两个因子阈值(ROC0volume0 ),每当实时因子值更新后判断是否 ROC > ROC0 且 volume > volume0

    • 若是,则触发下单;
    • 若下单后1分钟内仍未成交,则触发对应的撤单。
此策略在 CEP 引擎中的实现逻辑如下图所示:

定义事件类


CEP 引擎的处理对象为事件,因此要先定义引擎内需要的事件。在 CEP 系统中,定义一个事件时需要指定事件类型及其包含的每个属性的名称和类型。
此案例中,需要定义的事件有股票逐笔成交事件:
    class StockTick {
    securityid :: STRING
    time :: TIMESTAMP
    price :: DOUBLE
    volume :: INT
    def StockTick(securityid_, time_, price_, volume_) {
    securityid = securityid_
    time = time_
    price = price_
    volume = volume_
    }
    }
    成交回报事件:
      class ExecutionReport { 
      orderid :: STRING
      securityid :: STRING
      price :: DOUBLE
      volume :: INT
      def ExecutionReport(orderid_, securityid_, price_, volume_) {
      orderid = orderid_
      securityid = securityid_
      price = price_
      volume = volume_
      }
      }

      下单事件:

        class NewOrder { 
        orderid :: STRING
        securityid :: STRING
        price :: DOUBLE
        volume :: INT
        side :: INT
        type :: INT
        def NewOrder(orderid_, securityid_, price_, volume_, side_, type_) {
        orderid = orderid_
        securityid = securityid_
        price = price_
        volume = volume_
        side = side_
        type = type_
        }
        }

        撤单事件:

          class CancelOrder { 
          orderid :: STRING
          def CancelOrder(orderid_) {
          orderid = orderid_
          }
          }

          定义监视器和设置监听


          监视器是 CEP 系统中负责监控并响应事件的组件,它包含了完整的业务逻辑。此案例中,我们定义一个监视器 StrategyMonitor,在此处封装整体的交易策略。一些策略相关的属性和变量,如策略编号和参数,可以在定义监视器类时进行定义和初始化。整体监视器的结构大致如下:

            class StrategyMonitor { 
            strategyid :: INT 策略编号
            strategyParams :: ANY 策略参数:策略标的、标的参数配置
            dataview :: ANY // Data View 监控
            def StrategyMonitor(strategyid_, strategyParams_) {
            strategyid = strategyid_
            strategyParams = strategyParams_
            }
            def execReportExceedTimeHandler(orderid, exceedTimeSecurityid)
            def execReportHandler(execReportEvent)
            def handleFactorCalOutput(factorResult)
            def tickHandler(tickEvent)
            def initDataView()
            def createFactorCalEngine()
            def onload(){
            initDataView()
            createFactorCalEngine()
            securityids = strategyParams.keys()
            addEventListener(handler=tickHandler, eventType="StockTick",
            condition=<StockTick.securityid in securityids>, times="all")
            }
            }
            上述代码涵盖了创建因子计算引擎、创建数据视图、启动事件监听器等操作,每步操作的具体代码,请点击阅读原文查看详细脚本。

            DolphinDB 的响应式状态引擎能够支持实时数据的高性能增量计算,本案例在 CEP 引擎内创建了名为 factorCal 的响应式计算引擎,对策略中的两个因子值进行计算。

            创建 CEP 引擎


            接下来,我们可以通过createCEPEngine
            创建一个 CEP 引擎。注意在创建引擎时需要指定 StrategyMonitor 类作为 CEP 引擎的监视器,并传入 strategyid 和 strategyParams 作为实例化监视器方法的入参。

              dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody)
              share(streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody,
              array(STRING, 0) as orderid), "output")
              outputSerializer = streamEventSerializer(name=`serOutput,
              eventSchema=[NewOrder,CancelOrder], outputTable=objByName("output"),
              commonField="orderid")
              strategyid = 1
              strategyParams = dict(`300001`300002`300003,
              [dict(`ROCThreshold`volumeThreshold, [1,1000]),
              dict(`ROCThreshold`volumeThreshold, [1,2000]),
              dict(`ROCThreshold`volumeThreshold, [2, 5000])])
              engine = createCEPEngine(name='strategyDemo', monitors=<StrategyMonitor(strategyid,
              strategyParams)>, dummyTable=dummy, eventSchema=[StockTick,ExecutionReport],
              outputTable=outputSerializer)

              策略可视化展示


              CEP 引擎完成创建后,需要向引擎输入模拟数据。下列脚本首先模拟了120条逐笔成交事件写入 CEP 引擎,之后模拟了多条成交回报事件写入 CEP 引擎。

                ids = `300001`300002`300003`600100`600800
                for (i in 1..120) {
                sleep(500)
                tick = StockTick(rand(ids, 1)[0], now()+1000*i, 10.0+rand(1.0,1)[0], 100*rand(1..10, 1)[0])
                getStreamEngine(`strategyDemo).appendEvent(tick)
                }
                sleep(1000*20)
                print("begin to append ExecutionReport")
                for (orderid in (exec orderid from output where eventType="NewOrder")){
                sleep(250)
                if(not orderid in (exec orderid from output where eventType="CancelOrder")) {
                execRep = ExecutionReport(orderid, split(orderid,"_")[1], 10, 100)
                getStreamEngine(`strategyDemo).appendEvent(execRep)
                }
                }
                在定义监视器的步骤中,我们创建了一个数据视图引擎(DataView Engine),其能够帮助监控策略运行时数据的最新状态
                数据视图(DataView) 是在某一时刻对特定数据的快照。在 CEP 系统中,DataView 用于持续追踪和显示 CEP 引擎处理的中间变量或监控值的最新状态。这些监控值可能是由事件触发并在事件处理过程中动态计算和更新的,例如交易量、价格等随事件不断变化的量。数据视图引擎负责维护和管理一个或多个 DataView ,允许 CEP 引擎在运行过程中将监控值写入到这些 DataView 中。数据视图引擎还会保存每个监控值的最新快照,并将这些数据输出到目标表(通常是流数据表),使得这些数据可以被其他程序订阅或查询。
                本案例中,当策略运行时,某一时刻的 DataView 情况如下图所示:


                通过 CEP 引擎,联动 DolphinDB 的响应式状态引擎进行高性能的实时计算,我们实现了上述的事件化量价因子驱动策略,详细步骤和具体脚本可点击阅读原文了解更多。
                未来我们将介绍一个更加复杂、也更加贴近生产情况的量化策略。该策略不仅涉及到通过数据回放、 模拟撮合引擎对真实交易情况进行仿真,还包括计算 MACD 和 CCI 等复杂指标、以及在策略参数寻优时进行并行计算等高阶用法,最终通过 Dashboard 可视化策略运行的结果,敬请期待~

                Explore More



                扫描二维码,添加 DolphinDB 小助手

                点击阅读原文,详细了解 CEP 引擎应用案例

                文章转载自DolphinDB智臾科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论