本文转自百度智能边缘 BIE
Baetyl 项目由百度发起,是国内首个加入 LF Edge 的边缘计算项目,旨在将云计算能力拓展至用户现场,提供临时离线、低延时的计算服务,包括设备接入、消息路由、消息远程同步、函数计算、设备信息上报、配置下发等功能。Baetyl 和智能边缘 BIE(Baidu-IntelliEdge)云端管理套件配合使用,整体可达到边缘计算、云端管理、边云协同的效果,满足各种边缘计算场景。
本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算框架 Baetyl 来实现对业务的快速、低成本和有效地处理。
假设现有一组设备,组中的每个设备有一个 id,通过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计如下,其中 {device_id} 为设备的 id。
devices/{device_id}/messages
{"temperature": 30,"humidity" : 20}
[{"device_id" : "1", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2},{"device_id" : "2", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2},...]

eKuiper (https://github.com/lf-edge/ekuiper/)是基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,非常适合于运行在边缘设备端。
Baidu IoT Core (https://cloud.baidu.com/doc/IoTCore/index.html) 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析。






然后编辑配置文件,配置文件如下:
listeners:- address: tcp://0.0.0.0:8003principals:- username: testpassword: hahahapermissions:- action: pubpermit: ["#"]- action: subpermit: ["#"]session:sysTopics:- $link- $baetyllogger:level: debugencoding: console



从 eKuiper 官方镜像仓库镜像仓库选取 eKuiper 的官方 Docker 镜像,这里选取的是:
emqx/kuiper:0.5.1-alpine
然后创建容器服务,并添加 eKuiper 服务,设置镜像、添加端口映射以及环境变量。
MQTT_BROKER_ADDRESS=baetyl-broker.baetyl-edge-system:8003MQTT_BROKER_USERNAME=testMQTT_BROKER_PASSWORD=hahaha




用户在端侧可以通过 telnet 命令来判断边缘设备上 eKuiper 是否启动成功。

更多 eKuiper 资料可以参考 eKuiper (https://github.com/lf-edge/ekuiper/) 官网 。
从 eKuiper 官方镜像仓库镜像仓库选取 eKuiper 的官方 Docker 镜像,这里选取的是:
emqx/kuiper-kubernetes-tool:0.5.1
我们在新建 eKuiper 插件应用时,先新建对应的配置文件。
◆ 创建流语法解析
{"commands":[{"url":"/streams","description":"create stream1","method":"post","data":{"sql":"create stream demo (temperature float, humidity bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\");"}}]}
◆ 数据业务逻辑处理语法解析
eKuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 如下所示。
{"commands":[{"url":"/rules","description":"create rule1","method":"post","data":{"id":"rule1","sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)","actions": [{"log": {}},{"mqtt": {"server": "tcp://arncpan.iot.gz.baidubce.com:1883","topic": "$iot/test/user/b","protocol_version": "3.1.1","qos": 0,"clientId": "demo_001","username": "arncpan/test","password": "xxxx"}}]}}]}
avg: 平均值
max: 最大值
min: 最小值
count: 计数
另外还使用了几个基本的函数:
mqtt: 消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称
split_value: 该函数将第一个参数使用第二个参数进行分割,然后第三个参数指定下标,取得分割后的值。所以函数 split_value("devices/001/messages", "/", 1)调用就返回001
GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每10秒钟生成一批统计数据。
actions 列表中的 mqtt 类型的 action 的相关配置信息是 IoT Core 的连接信息。这里注意替换 IoTCore 的连接信息。
◆ 创建命令配置项
将上述两步的语法填写到配置项中。创建配置项如下:

◆ 创建配置信息配置项
port: 9081timeout: 500intervalTime: 30ip: "kuiper"logPath: "log/kuiper.log"commandDir: "sample"
创建配置项如下:

◆ 创建 kuiper-tool 应用





我们分别向 Baetyl-Broker 发送两条消息:
{"temperature": 30, "humidity" : 80}{"temperature": 60, "humidity" : 80}
预期 10s 后 IoT Core 会收到如下消息:
[{"device_id":"device_001","t_av":45,"t_count":2,"t_max":60,"t_min":30}]
实际操作:


此时观察端上应用的资源消耗:

通过本文,读者可以基于 Baetyl 边缘计算框架快速集成 eKuiper 流式处理引擎,快速搭建边缘侧的流式解决方案,灵活地开发出基于边缘数据分析的系统,实现数据的低时延、低成本和安全的处理。

点击"阅读原文" ,了解更多。
↓↓↓




