LF Edge eKuiper(由 EMQ 发起,现已捐献给 LF Edge 基金会)是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架 (如 Apache Spark、Apache Storm、Apache Flink) 迁移到边缘端。eKuiper 参考了上述云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源 (Source)、SQL (业务逻辑处理)、目标 (Sink) 的规则引擎来实现边缘端的流式数据处理。
社区站网址:https://ekuiper.org/zh
GitHub 仓库:https://github.com/lf-edge/ekuiper
我们非常高兴地宣布 eKuiper 1.11.0 版本现已正式发布!9月7号,EMQ 将进行 eKuiper 1.11 新版本的功能演示直播,预约二维码在文末。
本次发布的主题是进一步增强 SQL 语法,提供更具表达力的规则。此外,我们还为边缘环境提供了多项功能,专门优化规则在边缘环境的管理和运行。特别值得一提的是,我们引入了更灵活、更强大的 Sink 缓存重发策略,以缓解边缘网络的不稳定问题。
本版本汇集了来自 22 名贡献者的 187 个 PR,充分展现了社区的合作精神。欢迎您升级至 eKuiper 1.11.0,开启更强大的数据转换和分析能力!
扩展 SQL 语法
通过新增内置函数和 SQL 语法,我们引入了更多的数据转换和分析能力。
扩展内置函数
我们新增了超过 50 个全新的函数,包括日期时间函数、数组操作函数、对象操作函数、数学函数和类型转换函数。这大大丰富了计算和转换的能力。
新版本引入了兼容关系数据库例如 MySQL 的日期时间函数和数学函数。这样原有的数据库 SQL 可以方便地迁移到流式分析版本。
针对物联网常见的嵌套数据,新版本引入了更多的数组和对象操作函数,提供了所有常见的嵌套数据变换的基础能力。例如,以下的 SQL 语句可以对数组类型的字段 arr
进行去重和排序等操作:SELECT
ARRAY_DISTINCT(arr1), ARRAY_MAP(abs, arr1), ARRAY_SHUFFLE(arr1), ARRAY_SORT(arr1) FROM funcDemo
扩展语法
我们拓展了 SQL 语法,包括扩展通配符、别名引用,支持单引号字符串和 limit 子句,提升了表达能力和易用性。
通配符扩展支持 Except 和 Replace 从句。在物联网场景下,流的字段可能远远超过传统数据库,甚至可达到数百的字段。原生通配符 *
灵活性不足,只能选择全部字段。若要从字段中去除少量的字段,则需要在 SELECT 语句中列出一长串的字段。通配符的 except 语法即可解决这个问题。在以下例子中,我们去除了字段 a
和 c
。
SELECT * EXCEPT(a, c) FROM demo
新的别名引用功能同样大大提高了 SQL 语法的易用性和简洁性。在以下的例子中,SELECT 从句中定义的别名 ab 在同一个从句中被引用。在标准 SQL 中,这种用法是不合法的。若要引用别名,用户需要使用子查询或 CTE,SQL 语句会复杂得多,可读性也不佳。如果是嵌套引用,那就需要嵌套子查询,可读性就更加雪上加霜。别名引用的功能解决了这个问题,一定程度上也避免了重复计算,有助于性能提升。
SELECT a+b as ab, ab + c as abc FROM demo 新增了一系列的函数,包括窗口函数 row_number、累积分析函数以及规则级元数据函数如 rule_start。
与聚合函数相同,窗口函数也是作用于多行数据例如窗口数据。只是聚合函数会将多行合称为一行,例如 avg 会计算多行的平均值。而窗口函数计算后多行数据仍输出多行,计算结果为行的级别。本版本首次引入了窗口函数,添加了第一个函数 row_number(),他会将窗口的每一行添加一个行号,方便后续计算。
新的版本中,eKuiper 首次提供了规则级的累积分析函数和元数据相关函数,可以在除窗口计算之外,计算更长生命周期中的数据,包括累积的和、最大最小值、平均值等,也包括规则的累积触发次数、规则开始时间等。用户可以基于这些累积值实现更多的过滤和告警场景。
增强流式分析
创新的滑动窗口
我们首次引入了在流处理领域中条件触发,采集前后数据的滑动窗口,简化了相关数据的采集。在灵活数据采集的场景中,我们经常会有某个事件触发后采集前后数据的需求。用新的滑动窗口可以轻松地实现。以下的例子中,我们的规则实现了在温度变为 0 时,采集前后各 1 秒数据的需求。
{"id": "ruleCollect","name": "前后数据:温度变为0度时,获取前后各1秒数据","sql": "SELECT * FROM mockFileStream GROUP BY SLIDINGWINDOW(ss, 1, 1) OVER (WHEN temperature < 0 AND lag(temperature, 1, 0) >=0 )","actions": [{"mqtt": {"topic": "result/rule_2","server": "tcp://127.0.0.1:1883"}}],"options": {"isEventTime": true}}
优化的事件时间处理
我们重新设计了 watermark 算法和事件时间管理,引入了事件时间处理到连续查询和窗口查询中。在连续查询中使用事件时间,我们就可以处理乱序数据,使得同一份数据多次运行都可以得到一致的结果。
使用事件时间仅需要两个步骤:
创建流时,通过
timestamp
参数配置事件中的时间戳字段。创建规则时,设置 options 参数
isEventTime=true
并配置lateTolerance
参数即可。
为带有模式的数据流,我们对 JSON 解码进行了优化,提供更快的性能。如果数据源中的字段较多而规则中实际使用的字段少,在流定义中定义 Schema 可以大大提高运行时性能。
边缘规则管理
Sink 缓存重发策略使用户能够更高效地管理重发数据,具备不同的目标(MQTT 主题)和优先级。新的版本中:
重发数据和实时数据可以配置独立的通道,可并行发送。
重发数据可以配置单独的发送目的地,例如与实时数据不同的 MQTT 主题。下游应用可订阅不同的主题以分开处理实时数据和重发数据。
重发数据和实时数据可灵活配置优先级。
重发数据可添加字段标示,方便下游应用区分处理。
支持规则的运行时间范围
我们为计划规则引入了时间范围的支持,使其能够在指定的时间段内执行。
我们引入了动态更新配置的能力,包括日志调试级别和规则级别调试选项,从而简化了运行时的调试过程。用户可在运行时通过 API 变更全局的日志调试级别。例如下面的例子中,可在运行时更改日志级别为 debug,并打开文件 Log。
PATCH http://{{host}}/configsContent-Type: application/json{"debug":true,"fileLog":true}
之前的版本中,所有规则和 eKuiper 本身的日志都混合在同一个日志文件中。单独调试某个规则时,干扰比较多。新的版本中,规则级别可单独开启 debug 并输出到单独的日志文件中。如下例所示,通过规则 options 配置,可以单独配置规则级别日志。
{"id": "rule_debug","sql": "SELECT * FROM pubdata","actions": [{"log": {}}],"options": {"debug": true,"logFilename": "demo-rule.log"}}
结语






