暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

用户指南 | 如何使用 Flow 功能实现持续聚合,赋能实时计算和查询

GreptimeDB 2024-06-04
114


在最新发布的版本 v0.8 中,GreptimeDB 实现了 Flow Engine 来支持持续聚合功能,用户可以实时计算和查询数据的总和、平均值或使用其他聚合计算功能。

本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。



01

什么是持续聚合



持续聚合功能在实际应用中有许多落地场景,比如 Streaming ETL、实时分析、监控报警等。其中一个最常见的应用是降采样(Downsampling),使用窗口函数,可以把一个毫秒级输出频率的信号降采样到秒级(比如通过计算一秒内的平均值),这样就可以节省存储和计算成本。


进一步地,例如一个速度传感器高频输入大量数据,持续聚合功能可以对这些输入数据进行过滤,过滤掉速度低于或高于一定数值的数据点,并且计算每五分钟内的平均速度,最后将结果输出到结果表中。


持续聚合功能由 Flow Engine 提供。Flow 是 GreptimeDB 内置的一个轻量级流处理引擎,为用户提供了持续聚合、窗口计算等功能。用户可以直接使用 SQL 语句来创建一个 Flow 任务进行持续聚合,无需额外编写业务代码。Flow 任务可用于实时数据处理、实时计算等场景




02

应用示例



持续聚合功能可以完全通过 SQL 来定义和使用,本文将演示从创建 Flow 任务,接受数据进行流处理,到删除该 Flow 任务的全部流程。


我们以一个速度传感器读入左右轮的瞬时速度,并且过滤掉较高或较低的异常值,并计算五秒内的平均速度为例。


首先,创建一个源数据表作为输入:
CREATE TABLE velocity (
    ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    left_wheel FLOAT,
    right_wheel FLOAT,
    TIME INDEX(ts)
);


以及作为 Flow 任务输出的结果表:
CREATE TABLE avg_speed (
    avg_speed FLOAT,
    start_window TIMESTAMP TIME INDEX,
    end_window TIMESTAMP,
    update_at TIMESTAMP,
);


接下来就可以创建 Flow 任务了,这里需要使用我们提供的 SQL 方言语法 CREATE FLOW,可以参考如下例子:

CREATE FLOW calc_avg_speed
SINK TO avg_speed
AS
SELECT avg((left_wheel+right_wheel)/2)
FROM velocity
WHERE left_wheel > 0.5 AND right_wheel > 0.5 AND left_wheel < 60 AND right_wheel < 60
GROUP BY tumble(ts, '5 second');


上述 SQL 语句的意思是:创建一个名为 calc_avg_speed 的 Flow 任务,它会把结果输出到 avg_speed 表上。其上运行的查询由 AS 之后的 SELECT 语句定义。


首先,过滤掉左右轮速度值中过小或过大的值(小于等于 0.5 或大于等于 60);然后,基于输入表 velocity,以 ts 列的每五秒的间隔作为窗口,计算窗口内的平均速度。Flow 作业当中的查询完全基于 SQL 语法,并根据需求实现了相关扩展。


现在 Flow 任务已经创建,想要观察 avg_speed 中的持续聚合的结果,只需要向源数据表 velocity 中插入数据:
INSERT INTO velocity 
VALUES
    ("2021-07-01 00:00:00.200", 0.00.7),
    ("2021-07-01 00:00:00.200", 0.061.0),
    ("2021-07-01 00:00:02.500", 2.01.0,);


注意其中前两行都因为不符合条件被过滤掉了,只留下第三行被用于计算,查询输出表就可以得到计算结果:
SELECT * FROM avg_speed;

 avg_speed |        start_window        |         end_window         |         update_at          
-----------+----------------------------+----------------------------+----------------------------
       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
(1 row)


尝试向 velocity 表中插入更多数据:
INSERT INTO velocity 
VALUES
    ("2021-07-01 00:00:05.100", 5.04.0),
    ("2021-07-01 00:00:09.600", 2.32.1);


结果表 avg_speed 现在包含两行:分别表示两个 5 秒窗口的平均值,1.5 和 3.35(=(4.5+2.2)/2)
SELECT * FROM avg_speed;


 avg_speed |        start_window        |         end_window         |         update_at          
-----------+----------------------------+----------------------------+----------------------------
       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
      3.35 | 2021-07-01 00:00:05.000000 | 2021-07-01 00:00:10.000000 | 2024-06-04 03:35:34.693000


avg_speed 表中的列解释如下:

  • avg_speed:窗口中计算得到的平均速度;

  • start_window:窗口的开始时间;

  • end_window:窗口的结束时间;

  • update_at:更新行数据的时间。


其中 start_windowend_window 是 Flow 引擎的时间窗口函数 tumble 自动添加的。update_at 则是 Flow 引擎对 Flow 任务输出表自动添加的一列,用于标记这一行数据的最新更新时间,以便了解 Flow 任务的运行情况。

最后,使用 DROP FLOW 删除这个 Flow 任务:

DROP FLOW calc_avg_speed;



03

Flow 管理
及高级特性



创建或更新 Flow

创建 Flow 的语法是:

CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS 
<SQL>;


上述创建 Flow 任务的语法的解释如下:

  • flow-name 是全局唯一的标识符。

  • sink-table-name 是存储聚合数据的表名。它可以是一个现有的表或一个新表。如果目标表不存在,Flow 将自动创建目标表。

  • EXPIRE AFTER 是一个可选的时间间隔(使用 SQL的 INTERVAL 语法表示),用于从 Flow 引擎中清除过期的中间状态。

  • COMMENT 是 Flow 任务的注释性描述。

  • <SQL> 部分对应具体的持续聚合查询。Flow 计算引擎会从中提取引用到的表名并且作为 Flow 任务的源表。


一个简单的示例:
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, '5 minutes');


其中 EXPIRE AFTER 项可能需要进一步解释。简单来说,像所有现代的流处理系统一样, Flow 计算引擎有两个重要概念:系统时间和事件时间。

  • 系统时间:也叫处理时间,就是进行流处理计算的机器的系统时间。

  • 事件时间:某一行数据代表的事件发生的时间,一般也会记录在该行数据的某一列中,Flow 将 TIME INDEX 列视为事件时间。


EXPIRE AFTER 过期机制利用系统时间和事件时间之间的差值,清除掉 Flow 中间状态中过于古老的行。,上面示例 SQL 中,事件时间老于系统时间一小时以上的行就会被清除掉,不再参与运算。


🌟 注意,EXPIRE AFTER 只作用于新到达的数据。因此输出表中的结果不会单纯因为时间流逝而产生变化,只是不再会有更老的数据被更新到结果表上了。

另外, Flow 的中间状态目前也没有进行任何持久化,而是纯内存的,之后会添加持久化功能以使其可以在重启后也能保证数据正确。


删除 Flow

使用如下语句即可删除一个 Flow 任务:
DROP FLOW [IF EXISTS] <name>


Flow 目前支持的聚合函数

除了 countsumavgminmax 这几种聚合函数,Flow 目前还支持了加减乘除、比较和逻辑运算几种标量函数,以及固定窗口的 tumble 函数。


未来,我们计划在持续聚合中支持更多的聚合函数、标量函数和窗口函数。


04

总结



本文介绍了 GreptimeDB 中持续聚合功能的基本用法和特性,并且举例说明了创建、使用和删除 Flow 任务的流程。使用持续聚合可以随时、低延时(秒级/亚秒级)地获取用户关心的信息,同时避免了额外的内存和计算上的开销。


未来,除了支持更多函数之外,我们还会支持流处理中间状态的持久化和诸如 Temporal Filter 等高级功能,更详细的信息可以参考相关的 用户文档[1]开发指南[2]



Reference:

[1] https://docs.greptime.com/user-guide/continuous-aggregation/overview

[2] https://docs.greptime.com/contributor-guide/flownode/overview




关于 Greptime

Greptime 格睿科技专注于为物联网(如智慧能源、智能汽车等)及可观测等产生大量时序数据的领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前主要有以下三款产品:


GreptimeDB 是一款用 Rust 语言编写的开源时序数据库,具有云原生、无限水平扩展、高性能和融合分析等特点,帮助企业实时读写、处理和分析时序数据的同时,降低长期存储的成本。我们提供 GreptimeDB 企业版,支持更多企业特性和定制化服务,如有需要欢迎联系我们:15310923206(同微信)。


GreptimeCloud 是一款全托管的云上数据库即服务(DBaaS)解决方案,基于开源时序数据库 GreptimeDB 打造,能够高效支持可观测、物联网、金融等领域的应用。用户可以通过内置的可观测性解决方案 GreptimeAI 全面掌握 LLM 应用的成本、性能、流量和安全等情况


车云一体解决方案 是一款深入车企实际业务场景的时序数据库解决方案,解决了企业车辆数据呈几何倍数增长后的实际业务痛点。多模态车端数据库结合云端 GreptimeDB 企业版帮助车企极大降低流量、计算和存储成本,并帮助提升数据实时性和业务洞察能力。


GreptimeDB 作为开源项目,欢迎对时序数据库、Rust 语言等内容感兴趣的同学们参与贡献和讨论。第一次参与项目的同学推荐先从带有 good first issue 标签的 issue 入手,期待在开源社群里遇见你!扫码添加小助手备注“技术交流群”立即加入讨论吧~


Star us on GitHub Now: 

https://github.com/GreptimeTeam/greptimedb


官网:https://greptime.cn/

文档:https://docs.greptime.cn/

Twitter: https://twitter.com/Greptime

Slack: https://greptime.com/slack

LinkedIn: https://www.linkedin.com/company/greptime/


点击下方链接🔗关注 GreptimeDB,了解更多技术干货👇

往期精彩文章:


👇 点击下方阅读原文,立即体验 GreptimeDB!


文章转载自GreptimeDB,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论