CEP 引擎应用框架

灵活性与高扩展性:支持面向事件编程的 DolphinScript,提供了多种事件处理 API。 强大的计算引擎:支持与流计算引擎结合,方便对实时数据进行多种复杂计算,比如聚合计算、状态计算等,提升复杂策略的计算性能。 流批一体:支持将 DolphinDB 数据库中的数据回放至 CEP 引擎中,实现策略回测与批流一体。 可视化和监控:支持对接 DolphinDB 提供的多种运维工具和插件,用户可实时监控 CEP 引擎运行状态。 多种 API 写入:通过 DolphinDB API 或者插件,第三方客户端(如 Python 应用程序)或者其他第三方数据源(如 kafka)可以方便地将数据输入到 CEP 引擎或者订阅 CEP 引擎输出结果。
欲深入了解 CEP 引擎工作原理,请参考白皮书。前往 DolphinDB 官网,点击「开发者中心」—「白皮书」—「CEP 引擎」,即可下载获取完整资料。
初级高频量价因子策略实现
通过一个初级的事件化量价因子驱动的交易策略,我们来进一步了解 CEP 引擎如何在金融量化场景下发挥作用。
策略逻辑
我们可以基于股票逐笔成交数据,定义一个事件驱动策略,策略将根据每支股票最新的成交价涨幅和累计成交量,判断是否执行下单或撤单等操作。具体策略判断逻辑细节如下:
根据每一笔成交数据触发计算两个实时因子:最新成交价相对于15秒内最低成交价的涨幅 (变量 ROC)和过去1分钟的累计成交量 (变量 volume ); 在策略启动时设定每支股票的两个因子阈值(ROC0、volume0 ),每当实时因子值更新后判断是否 ROC > ROC0 且 volume > volume0。
若是,则触发下单; 若下单后1分钟内仍未成交,则触发对应的撤单。

定义事件类
class StockTick {securityid :: STRINGtime :: TIMESTAMPprice :: DOUBLEvolume :: INTdef StockTick(securityid_, time_, price_, volume_) {securityid = securityid_time = time_price = price_volume = volume_}}
class ExecutionReport {orderid :: STRINGsecurityid :: STRINGprice :: DOUBLEvolume :: INTdef ExecutionReport(orderid_, securityid_, price_, volume_) {orderid = orderid_securityid = securityid_price = price_volume = volume_}}
下单事件:
class NewOrder {orderid :: STRINGsecurityid :: STRINGprice :: DOUBLEvolume :: INTside :: INTtype :: INTdef NewOrder(orderid_, securityid_, price_, volume_, side_, type_) {orderid = orderid_securityid = securityid_price = price_volume = volume_side = side_type = type_}}
撤单事件:
class CancelOrder {orderid :: STRINGdef CancelOrder(orderid_) {orderid = orderid_}}
定义监视器和设置监听
监视器是 CEP 系统中负责监控并响应事件的组件,它包含了完整的业务逻辑。此案例中,我们定义一个监视器 StrategyMonitor,在此处封装整体的交易策略。一些策略相关的属性和变量,如策略编号和参数,可以在定义监视器类时进行定义和初始化。整体监视器的结构大致如下:
class StrategyMonitor {strategyid :: INT 策略编号strategyParams :: ANY 策略参数:策略标的、标的参数配置dataview :: ANY // Data View 监控def StrategyMonitor(strategyid_, strategyParams_) {strategyid = strategyid_strategyParams = strategyParams_}def execReportExceedTimeHandler(orderid, exceedTimeSecurityid)def execReportHandler(execReportEvent)def handleFactorCalOutput(factorResult)def tickHandler(tickEvent)def initDataView()def createFactorCalEngine()def onload(){initDataView()createFactorCalEngine()securityids = strategyParams.keys()addEventListener(handler=tickHandler, eventType="StockTick",condition=<StockTick.securityid in securityids>, times="all")}}
DolphinDB 的响应式状态引擎能够支持实时数据的高性能增量计算,本案例在 CEP 引擎内创建了名为 factorCal 的响应式计算引擎,对策略中的两个因子值进行计算。
创建 CEP 引擎
接下来,我们可以通过createCEPEngine
创建一个 CEP 引擎。注意在创建引擎时需要指定 StrategyMonitor 类作为 CEP 引擎的监视器,并传入 strategyid 和 strategyParams 作为实例化监视器方法的入参。
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody)share(streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody,array(STRING, 0) as orderid), "output")outputSerializer = streamEventSerializer(name=`serOutput,eventSchema=[NewOrder,CancelOrder], outputTable=objByName("output"),commonField="orderid")strategyid = 1strategyParams = dict(`300001`300002`300003,[dict(`ROCThreshold`volumeThreshold, [1,1000]),dict(`ROCThreshold`volumeThreshold, [1,2000]),dict(`ROCThreshold`volumeThreshold, [2, 5000])])engine = createCEPEngine(name='strategyDemo', monitors=<StrategyMonitor(strategyid,strategyParams)>, dummyTable=dummy, eventSchema=[StockTick,ExecutionReport],outputTable=outputSerializer)
策略可视化展示
CEP 引擎完成创建后,需要向引擎输入模拟数据。下列脚本首先模拟了120条逐笔成交事件写入 CEP 引擎,之后模拟了多条成交回报事件写入 CEP 引擎。
ids = `300001`300002`300003`600100`600800for (i in 1..120) {sleep(500)tick = StockTick(rand(ids, 1)[0], now()+1000*i, 10.0+rand(1.0,1)[0], 100*rand(1..10, 1)[0])getStreamEngine(`strategyDemo).appendEvent(tick)}sleep(1000*20)print("begin to append ExecutionReport")for (orderid in (exec orderid from output where eventType="NewOrder")){sleep(250)if(not orderid in (exec orderid from output where eventType="CancelOrder")) {execRep = ExecutionReport(orderid, split(orderid,"_")[1], 10, 100)getStreamEngine(`strategyDemo).appendEvent(execRep)}}

Explore More



扫描二维码,添加 DolphinDB 小助手
文章转载自DolphinDB智臾科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





