暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何在 DolphinDB 中计算 OHLC 条形图

原创 Handsome BOY 2022-10-25
709

本文将介绍如何使用历史数据和实时数据计算 OHLC 条形图。

可以在 DolphinDB 中的各种场景下高效地计算 OHLC 条形图。本教程将介绍如何使用历史数据和实时数据计算 OHLC 条形图。

  • 有历史数据

我们将解释如何在以下场景中使用批量计算来计算 OHLC 条形图:

  • 需要指定 OHLC 窗口的启动时间;
  • 一天中有多个交易时段,包括隔夜时段;
  • 重叠 OHLC 窗口;
  • OHLC 窗口根据交易量划分。

如果数据量很大,需要将结果写入数据库,我们可以使用 DolphinDB 内置的 Map-Reduce 功能进行并行计算。

  • 有实时数据

使用 API 实时接收市场数据,并使用 DolphinDB 内置的时序引擎进行实时计算。

1.用历史数据计算(批量计算)

要使用历史数据计算 OHLC 条形图,您可以使用 DolphinDB 的内置函数bardailyAlignedBarwj.

1.1 不指定 OHLC Windows 的启动时间

函数bar一般用于根据指定的区间对数据进行分组。

date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m; bar(date, 5);

结果是:

[09:30m,09:30m,09:45m,09:45m,09:55m,09:55m]

示例 1

n = 1000000 date = take(2019.11.07 2019.11.08, n) time = (09:30:00.000 + rand(int(6.5*60*60*1000), n)).sort!() timestamp = concatDateTime(date, time) price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(`AAPL`FB`AMZN`MSFT, n) trade = table(symbol, date, time, timestamp, price, volume).sortBy!(`symbol`timestamp) undef(`date`time`timestamp`price`volume`symbol);

计算 5 分钟 OHLC 条形图:

barMinutes = 5m OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, bar(time, barMinutes) as barStart

bar的参数_interval_支持DURATION类型。这里的“5m”表示5分钟。

1.2 需要指定OHLC窗口的启动时间

使用函数dailyAlignedBar指定 OHLC 窗口的开始时间。此功能可以容纳每天多个交易时段以及隔夜时段。

请注意,对于 function dailyAlignedBar,时间列的数据类型可以是 SECOND、TIME、NANOTIME、DATETIME、TIMESTAMP 和 NANOTIMESTAMP。指定每个交易时段开始时间的参数_timeOffset_必须具有相应的数据类型:分别为 SECOND、TIME 或 NANOTIME。

示例 2(每天一个交易时段):使用示例 1 中的相同表格“交易”计算 7 分钟 OHLC bar

barMinutes = 7m OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, 09:30:00.000, barMinutes) as bar

例3(每天两个交易时段):中国股市每天有两个交易时段,上午9:30-11:30和下午13:00-15:00。

使用以下脚本生成模拟数据:

n = 1000000 date = take(2019.11.07 2019.11.08, n) time = (09:30:00.000 + rand(2*60*60*1000, n/2)).sort!() join (13:00:00.000 + rand(2*60*60*1000, n/2)).sort!() timestamp = concatDateTime(date, time) price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(`600519`000001`600000`601766, n) trade = table(symbol, timestamp, price, volume).sortBy!(`symbol`timestamp) undef(`date`time`timestamp`price`volume`symbol)

计算 7 分钟 OHLC 条形图:

barMinutes = 7m sessionsStart=09:30:00.000 13:00:00.000 sessionsEnd=11:30:00.000 15:00:00.000 OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes, sessionsEnd) as bar

示例 4(每天两个交易时段,隔夜时段):一些期货每天有多个交易时段,包括隔夜时段。在本例中,第一个交易时段为上午 8:45 至下午 13:45,另一个时段为隔夜时段,从下午 15:00 至次日凌晨 05:00。

使用以下脚本生成模拟数据:

daySession = 08:45:00.000 : 13:45:00.000 nightSession = 15:00:00.000 : 05:00:00.000 n = 1000000 timestamp = rand(concatDateTime(2019.11.06, daySession[0]) .. concatDateTime(2019.11.08, nightSession[1]), n).sort!() price = 100+cumsum(rand(0.02, n)-0.01) volume = rand(1000, n) symbol = rand(`A120001`A120002`A120003`A120004, n) trade = select * from table(symbol, timestamp, price, volume) where timestamp.time() between daySession or timestamp.time()>=nightSession[0] or timestamp.time()<nightSession[1] order by symbol, timestamp undef(`timestamp`price`volume`symbol);

计算 7 分钟 OHLC 条形图:

barMinutes = 7 sessionsStart = [daySession[0], nightSession[0]] OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart;

1.3 重叠 OHLC 窗口

在上述示例中,OHLC 窗口不重叠。要计算重叠的 OHLC 窗口,我们可以使用函数wj (窗口连接)。使用该wj函数,左表的每一行对应右表的一个窗口,可以在这个窗口上进行计算。

示例 5: 每天两个交易时段,OHLC 窗口重叠

模拟中国股市数据,每 5 分钟计算一个 30 分钟 OHLC 条形图。

n = 1000000 sampleDate = 2019.11.07 symbols = `600519`000001`600000`601766 trade = table(take(sampleDate, n) as date, (09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time, rand(symbols, n) as symbol, 100+cumsum(rand(0.02, n)-0.01) as price, rand(1000, n) as volume)

首先生成 OHLC 窗口,然后使用函数cj(交叉连接)生成股票代码和 OHLC 窗口的组合。

barWindows = table(symbols as symbol).cj(table((09:30:00.000 + 0..23 * 300000).join(13:00:00.000 + 0..23 * 300000) as time))

然后使用函数wj计算具有重叠窗口的 OHLC 条形图:

OHLC = wj(barWindows, trade, 0:(30*60*1000), <[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume]>, `symbol`time)

1.4 根据交易量确定窗口

上述所有示例中的窗口都是随时间确定的。您还可以希望使用其他变量(例如交易量)作为确定窗口的基础。

示例 6:每次交易量增加 1,000,000 时计算 OHLC 条形图。

n = 1000000 sampleDate = 2019.11.07 symbols = `600519`000001`600000`601766 trade = table(take(sampleDate, n) as date, (09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time, rand(symbols, n) as symbol, 100+cumsum(rand(0.02, n)-0.01) as price, rand(1000, n) as volume) volThreshold = 1000000 t = select first(time) as barStart, first(price) as open, max(price) as high, min(price) as low, last(price) as close, last(cumvol) as cumvol from (select symbol, time, price, cumsum(volume) as cumvol from trade context by symbol) group by symbol, bar(cumvol, volThreshold) as volBar;

1.5 使用 MapReduce 加速计算

如果需要从数据库中提取大规模的历史数据,计算OHLC条形图,然后保存到数据库中,可以使用内置的MapReduce函数[mr](https://www.dolphindb.com/help/FunctionsandCommands/FunctionReferences/m/mr.html)进行并行加载和计算。这种方法可以显着提高速度。

此示例使用美国股市交易数据。原始数据存储在数据库“dfs://TAQ”中的“trades”表中,具有复合分区:基于交易日期的值分区和基于股票代码的范围分区。

(1) 将磁盘上表的元数据加载到内存中:

login(`admin, `123456) db = database("dfs://TAQ") trades = db.loadTable("trades")

(2) 创建一个模板表’model’,然后根据模板表的schema在数据库“dfs://TAQ”中创建一个空表’OHLC’来存储结果:

model=select top 1 Symbol, Date, Time.second() as bar, PRICE as open, PRICE as high, PRICE as low, PRICE as close, SIZE as volume from trades where Date=2007.08.01, Symbol=`EBAY if(existsTable("dfs://TAQ", "OHLC")) db.dropTable("OHLC") db.createPartitionedTable(model, `OHLC, `Date`Symbol)

(3) 使用函数mr计算 OHLC 条形图并将结果写入表 ‘OHLC’:

def calcOHLC(inputTable){ tmp=select first(PRICE) as open, max(PRICE) as high, min(PRICE) as low, last(PRICE) as close, sum(SIZE) as volume from inputTable where Time.second() between 09:30:00 : 15:59:59 group by Symbol, Date, 09:30:00+bar(Time.second()-09:30:00, 5*60) as bar loadTable("dfs://TAQ", `OHLC).append!(tmp) return tmp.size() } ds = sqlDS(<select Symbol, Date, Time, PRICE, SIZE from trades where Date between 2007.08.01 : 2019.08.01>) mr(ds, calcOHLC, +)
  • ‘ds’ 是函数生成的一系列数据源sqlDS。每个数据源代表一个分区中的数据。
  • FunctioncalcOHLC是 MapReduce 中的映射函数。它从每个数据源计算 OHLC 条形图,将结果写入数据库并返回写入数据库的行数。
  • “+”是 MapReduce 中的 reduce 函数。它将所有映射函数的结果(写入数据库的行数)相加,以返回写入数据库的总行数。

2.实时计算

下图描述了在 DolphinDB 中实时计算 OHLC 条形图的过程:

数据供应商通常通过 Python、Java 或其他语言的 API 提供订阅服务。在本例中,交易数据通过 DolphinDB Python API 写入流表。DolphinDB 的时间序列引擎以指定的频率进行实时 OHLC 计算。

此示例使用文本文件trades.csv来模拟实时数据。下表显示了它的列名和一行样本数据:

计算结果的输出表包含以下 7 列:

以下部分描述了实时计算 OHLC 条形图的步骤。

2.1 使用Python接收实时数据并写入DolphinDB流表

  • 在 DolphinDB 中创建流表
share streamTable(100:0, `Symbol`Datetime`Price`Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as Trade
  • 将模拟数据插入流表

由于’Datetime’列的单位是SECOND,而pandas中的DataFrame只能使用DolphinDB中的nanotimestamp数据类型对应的DateTime[64],我们需要在将数据插入流表之前转换’Datetime’列的数据类型.

import dolphindb as ddb import pandas as pd import numpy as np csv_file = "trades.csv" csv_data = pd.read_csv(csv_file, dtype={'Symbol':str} ) csv_df = pd.DataFrame(csv_data) s = ddb.session(); s.connect("127.0.0.1",8848,"admin","123456") # Upload DataFrame to DolphinDB and convert the data type of column 'Datetime' s.upload({"tmpData":csv_df}) s.run("data = select Symbol, datetime(Datetime) as Datetime, Price, Volume from tmpData") s.run("tableInsert(Trade,data)")

2.2 实时计算 OHLC 条形图

可以在 DolphinDB 中实时移动窗口中计算 OHLC 条。一般有以下两种情况:

  1. 计算在窗口结束后进行
  • 非重叠窗口
  • 部分重叠的窗口

2. 计算在当前窗口内连续进行

2.2.1 windows结束后进行计算

对于非重叠窗口,为参数 ‘windowSize’ 和参数 ‘step’ 设置相同的值 function createTimeSeriesAggregator。对于重叠窗口,设置“windowSize”>“step”。请注意,“windowSize”必须是“step”的倍数。

非重叠窗口:每 5 分钟计算前 5 分钟的 OHLC 条形图。

share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC1 tsAggr1 = createTimeSeriesAggregator(name="tsAggr1", windowSize=300, step=300, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC1, timeColumn=`Datetime, keyColumn=`Symbol) subscribeTable(tableName="Trade", actionName="act_tsAggr1", offset=0, handler=append!{tsAggr1}, msgAsTable=true);

重叠窗口:每 1 分钟计算前 5 分钟的 OHLC 条形图。

share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC2 tsAggr2 = createTimeSeriesAggregator(name="tsAggr2", windowSize=300, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC2, timeColumn=`Datetime, keyColumn=`Symbol) subscribeTable(tableName="Trade", actionName="act_tsAggr2", offset=0, handler=append!{tsAggr2}, msgAsTable=true);

2.2.2 一个窗口内的多重计算

如果未指定“updateTime”,则窗口的计算不会在窗口的结束时间之前发生。要在当前窗口结束之前对其进行计算,我们可以指定“updateTime”。‘step’ 必须是 ‘updateTime’ 的倍数。

如果指定了“updateTime”,则当前窗口内可能会发生多次计算。这些计算通过以下规则触发:

(1)将当前窗口划分为’windowSize’/'updateTime’小窗口。每个小窗口的长度为“updateTime”。当小窗口结束后有新记录到达时,如果当前窗口中至少有一条记录未用于计算(不包括新记录),则触发计算。请注意,此计算不使用新记录。

(2)如果在一条记录到达聚合器后max(2*updateTime, 2 seconds),它还没有被用于计算,则触发计算。此计算包括当时当前窗口中的所有数据。

如果指定了“keyColumn”,则这些规则适用于每个组。

当前窗口内每个计算结果的时间戳是当前窗口开始时间或开始时间+‘windowSize’(取决于参数’useWindowStartTime’),而不是当前窗口内的时间戳。

如果指定了“updateTime”,则“outputTable”必须是键控表(使用函数创建keyedTable)。

在以下示例中,我们计算 1 分钟 OHLC 条形图。当前窗口的计算在新消息到达后不迟于 2 秒触发。

首先,创建一个流表作为输出表,并使用列 ‘datetime’ 和 ‘Symbol’ 作为主键。

share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC

在时序引擎中,参数“updateTime”设置为 1(秒);参数’useWindowStartTime’设置为true,这意味着输出表的第一列是窗口的开始时间。

tsAggr = createTimeSeriesAggregator(name="tsAggr", windowSize=60, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol, updateTime=1, useWindowStartTime=true)

订阅流表“Trade”:

subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggr}, msgAsTable=tr

2.3 在 Python 中显示 OHLC 条

在本例中,聚合器的输出表也被定义为流表。客户端可以通过 Python API 订阅输出表,并将计算结果显示到 Python 终端。

以下脚本使用 Python API 订阅实时聚合计算的输出表 OHLC,并打印结果。

from threading import Event import dolphindb as ddb import pandas as pd import numpy as np s = ddb.session() # set local port 20001 for subscribed streaming data s.enableStreaming(20001) def handler(lst): print(lst) # subscribe to the stream table OHLC (local port 8848) s.subscribe ("127.0.0.1", 8848, handler, "OHLC") Event().Wait()

您还可以通过 Grafana 等数据可视化工具连接 DolphinDB 数据库查询输出表并以图表形式显示结果。

原文标题:How to Calculate OHLC Bars in DolphinDB
原文作者:Davis Zhou
原文地址:https://dzone.com/articles/how-to-calculate-ohlc-bars-in-dolphindb

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论