综合交易平台(Comprehensive Transaction Platform),简称 CTP,是上海期货信息技术有限公司专门为期货公司开发的一套期货经纪业务管理系统,承载了海量交易指令与行情数据的实时传输。
为实现 CTP 行情数据与数据库的高效对接,我们基于官方 CTP C++ API 开发了 DolphinDB CTP 行情插件,将复杂的 CTP 原生接口封装为易用的脚本函数。目前,DolphinDB CTP 插件同时支持 CTP 标准版与 CTPMini 版接口,用户可以通过灵活配置实现行情数据的全自动化采集与存储。大致流程如下:

CTP 插件接入实时行情数据
尽管 DolphinDB CTP 插件极大降低了数据接入门槛,但在实际应用过程中,用户仍然面临一些问题。根据用户反馈,我们将这些问题归纳为三类:合约代码信息获取困难、如何进行数据标准化处理、收盘后有测试数据干扰。接下来,我们将逐一解析这些问题并提供相应的解决方案。
合约代码信息获取困难
CTP 行情数据以合约代码(InstrumentID)为索引,但获取完整的合约清单需通过交易账号登录查询,而行情端账号无此权限。若缺乏有效的合约列表,用户只能手动维护代码表,既耗时又容易遗漏品种变更。
针对这一问题,如果有 CTP 交易端账号,可以通过登录交易端账号,调用 ctp::queryInstrument() 或 ctpMini::queryInstrument() 函数查询并返回包含所有合约代码的信息表。
// 在 ctp 中调用ids =exec instrumentIDfrom ctp::queryInstrument(ip, port, userID, password, brokerID, appID, authCode,[exchangeID], [waitTimes])// 在 ctpMini 中调用ids =exec instrumentIDfrom ctpMini::queryInstrument(ip, port, userID, password, brokerID, appID,authCode, [exchangeID], [waitTimes])
如果没有 CTP 交易端账号,可以自行构建合约信息表。这里有两种方式:(1)将需要订阅的合约信息存储成文本文件,通过读取 csv 获得;(2)将需要订阅的合约信息存在分布式表里,通过查询分布式表获得。大家可以根据实际需求创建期货期权合约代码的静态信息表,以下示例为基于最基础的数据创建信息表:
// 构建合约代码存储的数据库dbName = "dfs://future_basic"db = database(dbName, VALUE, 2024.01.01..2024.12.31)// 构建合约代码存储的分布式表create table "dfs://future_basic"."future_basic"(trade_date DATEtrade_code SYMBOLexchange SYMBOL)// 加载合约信息表并存储在创建好的分布式表中t = loadText("./future_basic.csv")loadTable("dfs://future_basic", "future_basic").append!(t)
静态信息表结构如下:

接着,我们从合约信息表中获取需要订阅的合约范围:
ids =exec trade_codefrom loadTable("dfs://future_basic", "future_basic")where trade_date=today()
如何进行数据标准化处理
CTP 原始数据包含很多需要进行二次加工的字段,我们需要对它们进行标准化处理。例如,原始数据中的 ExchangeID 字段都是空值,需要关联静态表补充交易所信息;不同交易所对夜盘数据的 TradingDay(交易日)与 ActionDay(实际日期)定义不同,直接存储会导致数据分区错乱;每天早上 6-8 点,CTP 服务器会重复发送前一天夜盘的行情,如果不加以过滤,会造成数据冗余。
为此,我们需要关注 exchange_id、trade_date / action_date 以及重复数据的处理。
针对 exchange_id,可以在创建订阅之前,先将需要的静态信息存为一个共享内存表,然后在订阅的回调函数里,通过 InstrumentID 和 trade_code 字段进行关联,获取静态信息表中的 exchange 信息。代码如下:
// 将静态信息存为一个共享内存表tmp = select trade_code, exchange from loadTable("dfs://future_basic","future_basic") where trade_date=today()share(tmp, "basicInfoTable")
针对trade_date 和 action_date,由于不同交易所的原始行情中 ActionDay 和 TradingDay 的规则不同 ,所以为了将同一天的交易数据存在同一个日期分区里,需要对 ActionDay 和 TradingDay 进行处理。我们可以调用 temporalAdd 函数对日期进行加减计算。例如,temporalAdd(trade_date, -1, `DCE) 就是按照 DCE(大商所)的交易日历找 trade_date 的前一个交易日期。
为了避免重复写入数据,可以将数据里的 tradeTime 和 receivedTime 作为过滤条件对数据进行过滤。例如,设置 20:40~03:00 和 08:40~15:30 这两个时间段,只有 tradeTime 和 receivedTime 都在设置的时间段内,才将数据落库。
盘后测试数据干扰
由于每日收盘后 CTP 插件仍会接收一些测试数据,所以我们需要及时关闭接口,以避免无效数据流入。如果都需要人工进行操作,会导致效率低下。
因此,我们可以调用 scheduleJob 函数设定 CTP 连接和关闭的定时任务,参考代码如下:
// 由于 scheduleJob 中的 jobFunc 必须是没有参数的函数,所以首先定义关闭 CTP 连接函数def ctp_dailyclose(){ip = "111.111.111.11"port = 11111config={"OutputElapsed": true,"ReceivedTime": true,"ConcatTime": true}conn = ctp::connect(ip, port, config)try{ ctp::close(conn) }catch(ex){print ex}}scheduleJob(jobId=`daily_open, jobDesc="Daily open job",jobFunc=run{"startup.dos"}, scheduleTime=[08:55m, 20:55m], startDate=2024.08.27,endDate=2025.08.27, frequency='D')scheduleJob(jobId=`daily_close, jobDesc="Daily close job", jobFunc=ctp_dailyclose,scheduleTime=[03:00m, 17:00m], startDate=2024.08.27, endDate=2025.08.27,frequency='D')
希望上述内容能够帮助大家解决合约代码信息获取困难、跨市场日期混乱、无效数据干扰等问题,让大家从繁琐的操作中解放出来,专注于核心业务分析。
除了 CTP 插件外,DolphinDB 还提供了恒生 NSQ、通联、华泰 Insight 等多家主流平台的行情接入插件。同时,DolphinDB 也提供数据存取、消息队列、机器学习等多种功能插件,为用户带来高效便捷的开发体验。

欢迎点击“阅读原文”,前往 DolphinDB 插件市场了解更多详情~
Explore More








