持续聚合功能在实际应用中有许多落地场景,比如 Streaming ETL、实时分析、监控报警等。其中一个最常见的应用是降采样(Downsampling),使用窗口函数,可以把一个毫秒级输出频率的信号降采样到秒级(比如通过计算一秒内的平均值),这样就可以节省存储和计算成本。
进一步地,例如一个速度传感器高频输入大量数据,持续聚合功能可以对这些输入数据进行过滤,过滤掉速度低于或高于一定数值的数据点,并且计算每五分钟内的平均速度,最后将结果输出到结果表中。
持续聚合功能由 Flow Engine 提供。Flow 是 GreptimeDB 内置的一个轻量级流处理引擎,为用户提供了持续聚合、窗口计算等功能。用户可以直接使用 SQL 语句来创建一个 Flow 任务进行持续聚合,无需额外编写业务代码。Flow 任务可用于实时数据处理、实时计算等场景。
持续聚合功能可以完全通过 SQL 来定义和使用,本文将演示从创建 Flow 任务,接受数据进行流处理,到删除该 Flow 任务的全部流程。
我们以一个速度传感器读入左右轮的瞬时速度,并且过滤掉较高或较低的异常值,并计算五秒内的平均速度为例。
CREATE TABLE velocity (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
left_wheel FLOAT,
right_wheel FLOAT,
TIME INDEX(ts)
);
CREATE TABLE avg_speed (
avg_speed FLOAT,
start_window TIMESTAMP TIME INDEX,
end_window TIMESTAMP,
update_at TIMESTAMP,
);
接下来就可以创建 Flow 任务了,这里需要使用我们提供的 SQL 方言语法 CREATE FLOW,可以参考如下例子:
CREATE FLOW calc_avg_speed
SINK TO avg_speed
AS
SELECT avg((left_wheel+right_wheel)/2)
FROM velocity
WHERE left_wheel > 0.5 AND right_wheel > 0.5 AND left_wheel < 60 AND right_wheel < 60
GROUP BY tumble(ts, '5 second');
上述 SQL 语句的意思是:创建一个名为 calc_avg_speed 的 Flow 任务,它会把结果输出到 avg_speed 表上。其上运行的查询由 AS 之后的 SELECT 语句定义。
首先,过滤掉左右轮速度值中过小或过大的值(小于等于 0.5 或大于等于 60);然后,基于输入表 velocity,以 ts 列的每五秒的间隔作为窗口,计算窗口内的平均速度。Flow 作业当中的查询完全基于 SQL 语法,并根据需求实现了相关扩展。
INSERT INTO velocity
VALUES
("2021-07-01 00:00:00.200", 0.0, 0.7),
("2021-07-01 00:00:00.200", 0.0, 61.0),
("2021-07-01 00:00:02.500", 2.0, 1.0,);
SELECT * FROM avg_speed;
avg_speed | start_window | end_window | update_at
-----------+----------------------------+----------------------------+----------------------------
1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
(1 row)
INSERT INTO velocity
VALUES
("2021-07-01 00:00:05.100", 5.0, 4.0),
("2021-07-01 00:00:09.600", 2.3, 2.1);
SELECT * FROM avg_speed;
avg_speed | start_window | end_window | update_at
-----------+----------------------------+----------------------------+----------------------------
1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
3.35 | 2021-07-01 00:00:05.000000 | 2021-07-01 00:00:10.000000 | 2024-06-04 03:35:34.693000
avg_speed 表中的列解释如下:
avg_speed:窗口中计算得到的平均速度;
start_window:窗口的开始时间;
end_window:窗口的结束时间;
update_at:更新行数据的时间。
最后,使用 DROP FLOW 删除这个 Flow 任务:
DROP FLOW calc_avg_speed;
创建 Flow 的语法是:
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS
<SQL>;
上述创建 Flow 任务的语法的解释如下:
flow-name 是全局唯一的标识符。
sink-table-name 是存储聚合数据的表名。它可以是一个现有的表或一个新表。如果目标表不存在,Flow 将自动创建目标表。
EXPIRE AFTER 是一个可选的时间间隔(使用 SQL的 INTERVAL 语法表示),用于从 Flow 引擎中清除过期的中间状态。
COMMENT 是 Flow 任务的注释性描述。
<SQL> 部分对应具体的持续聚合查询。Flow 计算引擎会从中提取引用到的表名并且作为 Flow 任务的源表。
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, '5 minutes');
其中 EXPIRE AFTER 项可能需要进一步解释。简单来说,像所有现代的流处理系统一样, Flow 计算引擎有两个重要概念:系统时间和事件时间。
系统时间:也叫处理时间,就是进行流处理计算的机器的系统时间。
事件时间:某一行数据代表的事件发生的时间,一般也会记录在该行数据的某一列中,Flow 将 TIME INDEX 列视为事件时间。
EXPIRE AFTER 过期机制利用系统时间和事件时间之间的差值,清除掉 Flow 中间状态中过于古老的行。,上面示例 SQL 中,事件时间老于系统时间一小时以上的行就会被清除掉,不再参与运算。
🌟 注意,EXPIRE AFTER 只作用于新到达的数据。因此输出表中的结果不会单纯因为时间流逝而产生变化,只是不再会有更老的数据被更新到结果表上了。
另外, Flow 的中间状态目前也没有进行任何持久化,而是纯内存的,之后会添加持久化功能以使其可以在重启后也能保证数据正确。
删除 Flow
DROP FLOW [IF EXISTS] <name>
Flow 目前支持的聚合函数
除了 count、sum、avg、min、max 这几种聚合函数,Flow 目前还支持了加减乘除、比较和逻辑运算几种标量函数,以及固定窗口的 tumble 函数。
未来,我们计划在持续聚合中支持更多的聚合函数、标量函数和窗口函数。
本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。使用持续聚合可以随时、低延时(秒级/亚秒级)地获取用户关心的信息,同时避免了额外的内存和计算上的开销。
未来,除了支持更多函数之外,我们还会支持流处理中间状态的持久化和诸如 Temporal Filter 等高级功能,更详细的信息可以参考相关的 用户文档[1] 和 开发指南[2]。
Reference:
[1] https://docs.greptime.com/user-guide/continuous-aggregation/overview
[2] https://docs.greptime.com/contributor-guide/flownode/overview
关于 Greptime
Greptime 格睿科技专注于为物联网(如智慧能源、智能汽车等)及可观测等产生大量时序数据的领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前主要有以下三款产品:
GreptimeDB 是一款用 Rust 语言编写的开源时序数据库,具有云原生、无限水平扩展、高性能和融合分析等特点,帮助企业实时读写、处理和分析时序数据的同时,降低长期存储的成本。我们提供 GreptimeDB 企业版,支持更多企业特性和定制化服务,如有需要欢迎联系我们:15310923206(同微信)。
GreptimeCloud 是一款全托管的云上数据库即服务(DBaaS)解决方案,基于开源时序数据库 GreptimeDB 打造,能够高效支持可观测、物联网、金融等领域的应用。用户可以通过内置的可观测性解决方案 GreptimeAI 全面掌握 LLM 应用的成本、性能、流量和安全等情况。
车云一体解决方案 是一款深入车企实际业务场景的时序数据库解决方案,解决了企业车辆数据呈几何倍数增长后的实际业务痛点。多模态车端数据库结合云端 GreptimeDB 企业版帮助车企极大降低流量、计算和存储成本,并帮助提升数据实时性和业务洞察能力。

Star us on GitHub Now:
https://github.com/GreptimeTeam/greptimedb
官网:https://greptime.cn/
文档:https://docs.greptime.cn/
Twitter: https://twitter.com/Greptime
Slack: https://greptime.com/slack
LinkedIn: https://www.linkedin.com/company/greptime/
往期精彩文章:


👇 点击下方阅读原文,立即体验 GreptimeDB!





