暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

eKuiper 与百度智能边缘框架 BIE 集成方案

EMQX 2021-06-25
2886

本文转自百度智能边缘 BIE

Baetyl 项目由百度发起,是国内首个加入 LF Edge 的边缘计算项目,旨在将云计算能力拓展至用户现场,提供临时离线、低延时的计算服务,包括设备接入、消息路由、消息远程同步、函数计算、设备信息上报、配置下发等功能。Baetyl 和智能边缘 BIE(Baidu-IntelliEdge)云端管理套件配合使用,整体可达到边缘计算、云端管理、边云协同的效果,满足各种边缘计算场景。

本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算框架 Baetyl 来实现对业务的快速、低成本和有效地处理。

在各类物联网项目中,比如智能楼宇项目,需要将楼宇的数据(比如电梯、燃气、水电等)进行采集和分析。一种解决方案是将所有的设备直接接入在云端的物联网平台,类似于像 Baidu IoT Core 或者 AWS IoT Core。这种解决方案的问题在于,数据处理时延较长:
数据处理时延较长:通过 Internet 传输和云端的处理后返回给设备,所需时间较长 数据传输和存储成本:通过 Internet 传输需要带宽,对于大规模连接的物联网项目来说,耗费的带宽会相当可观 数据的安全性:有些物联网的数据会相当敏感,全部通过物联网传输的话会有风险 为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免不必要的时延、成本和安全问题。开源框架 Baetyl 是百度贡献给 Linux 基金会的开源边缘计算框架,主推物联网场景下端侧的边缘计算解决方案。
本文将流处理模块 eKuiper 部署到边缘计算框架 baetyl 上,对一段时间内边缘侧的设备消息进行流式处理,并将处理结果上传云端进行存储。

  业务场景  

假设现有一组设备,组中的每个设备有一个 id,通过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计如下,其中 {device_id} 为设备的 id。

    devices/{device_id}/messages
    每个设备发送的数据格式为 JSON,发送的通过该传感器采集的温度与湿度数据。
      {
      "temperature": 30,
      "humidity" : 20
      }
      现在需要实时分析数据,并提出以下的需求:对每个设备的温度数据按照每 10 秒钟计算平均值(t_av),并且记下 10 秒钟内的最大值 (t_max)、最小值(t_min) 和数据条数(t_count),计算完毕后将这 4 个结果进行保存,以下为样例结果数据:
        [
        {
        "device_id" : "1", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2
        },
        {
        "device_id" : "2", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2
        },
        ...
        ]

          方案介绍  
        如下图所示,我们将在 baetyl 边缘计算框架上,采用边缘分析/流式数据处理的方式,从 baetyl-broker 订阅相关设备消息,最后将处理结果输出到 Baidu 的 IoT Core 中。

        baetyl-broker  (https://github.com/baetyl/baetyl-broker) 是 Baetyl 框架端侧的消息中间件,采用 MQTT3.1.1 协议,可在低带宽、不可靠网络中提供可靠的消息传输服务。

        eKuiper (https://github.com/lf-edge/ekuiper/)是基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,非常适合于运行在边缘设备端。

        Baidu IoT Core (https://cloud.baidu.com/doc/IoTCore/index.html) 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析。

          安装 baetyl 计算框架  
        在云端新建边缘节点并安装到边缘设备。安装成功后如下所示:

          新建 IoT Core 实例  
        参考百度云 IoTCore 文档 (https://cloud.baidu.com/doc/IoTCore/s/Akck4811r新建 IoT Core 实例并进行相关的设备模板配置。配置成功后,使用 MqttBox 进行连接,其中 $iot/test/user/# 主题是我们自定义的具有收发权限的用户主题

          配置 baetyl-broker  
        编辑 baetyl-broker 配置,暴露一个外部端口供测试使用。

        然后编辑配置文件,配置文件如下:

          listeners:
          - address: tcp://0.0.0.0:8003
          principals:
          - username: test
          password: hahaha
              permissions:
          - action: pub
          permit: ["#"]
          - action: sub
          permit: ["#"]

          session:
          sysTopics:
          - $link
          - $baetyl
          logger:
          level: debug
          encoding: console
          新增 8003 端口供测试使用。并且需要设置映射宿主机端口以供连接。

          然后在本地使用 MqttBox 连接 baetyl-broker,来测试连通性。

            安装 eKuiper  

          从 eKuiper 官方镜像仓库镜像仓库选取 eKuiper 的官方 Docker 镜像,这里选取的是:

             emqx/kuiper:0.5.1-alpine

            然后创建容器服务,并添加 eKuiper 服务,设置镜像、添加端口映射以及环境变量。

              MQTT_BROKER_ADDRESS=baetyl-broker.baetyl-edge-system:8003
              MQTT_BROKER_USERNAME=test
              MQTT_BROKER_PASSWORD=hahaha

              用户在端侧可以通过 telnet 命令来判断边缘设备上 eKuiper 是否启动成功。

              更多 eKuiper 资料可以参考 eKuiper (https://github.com/lf-edge/ekuiper/) 官网 。

                安装集成 eKuiper 插件  
              eKuiper 原生的 stream、rule 创建都是通过 Http 请求,为了适配 baetyl 平台,可以使用 eKuiper 推出的适配插件: kuiper-kubernetes-tool (https://github.com/lf-edge/ekuiper/tree/master/tools/kubernetes) ,支持从配置文件加载 stream、rule 配置。

              从 eKuiper 官方镜像仓库镜像仓库选取 eKuiper 的官方 Docker 镜像,这里选取的是:

                 emqx/kuiper-kubernetes-tool:0.5.1

                我们在新建 eKuiper 插件应用时,先新建对应的配置文件。

                 创建流语法解析

                创建流的目的是为了定义发送到该流上的数据格式,类似于在关系数据库中定义表的结构。eKuiper 中所有支持的数据类型,可以参考 eKuiper (https://github.com/lf-edge/ekuiper/) 官网 。
                  {
                  "commands":[
                  {
                  "url":"/streams",
                  "description":"create stream1",
                  "method":"post",
                  "data":{
                  "sql":"create stream demo (temperature float, humidity bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\");"
                  }
                  }
                  ]
                  }
                  上述语句在 eKuiper 中创建了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages,这里请注意采用了通配符 +,用于订阅不同设备的消息。

                   数据业务逻辑处理语法解析

                  eKuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 如下所示。

                    {
                    "commands":[
                    {
                    "url":"/rules",
                    "description":"create rule1",
                    "method":"post",
                    "data":{
                    "id":"rule1",
                    "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
                    "actions": [
                    {
                    "log": {}
                    },
                    {
                    "mqtt": {
                    "server": "tcp://arncpan.iot.gz.baidubce.com:1883",
                    "topic": "$iot/test/user/b",
                    "protocol_version": "3.1.1",
                    "qos": 0,
                    "clientId": "demo_001",
                    "username": "arncpan/test",
                    "password": "xxxx"
                    }
                    }
                    ]
                    }
                    }]
                    }
                    这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。
                    • avg: 平均值

                    • max: 最大值

                    • min: 最小值

                    • count: 计数

                    另外还使用了几个基本的函数:

                    • mqtt: 消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称

                    • split_value: 该函数将第一个参数使用第二个参数进行分割,然后第三个参数指定下标,取得分割后的值。所以函数 split_value("devices/001/messages", "/", 1)调用就返回001

                    • GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每10秒钟生成一批统计数据。

                    actions 列表中的 mqtt 类型的 action 的相关配置信息是 IoT Core 的连接信息。这里注意替换 IoTCore 的连接信息。

                     创建命令配置项

                    将上述两步的语法填写到配置项中。创建配置项如下:

                     创建配置信息配置项

                    配置信息用于 kuiper-kubernetes-tool 连接 eKuiper 模块,其中指定了 eKuiper 的 ip、port 等信息。
                      port: 9081
                      timeout: 500
                      intervalTime: 30
                      ip: "kuiper"
                      logPath: "log/kuiper.log"
                      commandDir: "sample"

                      关于配置详情可以参考 kuiper-kubernetes-tool  (https://github.com/lf-edge/ekuiper/tree/master/tools/kubernetes) 文档 。其中 9081 端口是 eKuiper 默认的 Restful API 端口。

                      创建配置项如下:

                       创建 kuiper-tool 应用

                      新建容器服务,并添加 kuiper-kubernetes-tool 服务,设置镜像、添加上两步的配置项。

                      如果上述步骤都安装正确,在边缘设备执行如下命令,可以得到如下结果:

                        测试  
                      我们使用 Mqtt Box 模拟设备向事先约定的 Topic 主题发送消息,观察 IoT Core 是否可以收到流式处理的结果。

                      我们分别向 Baetyl-Broker 发送两条消息:

                        {"temperature": 30, "humidity" : 80}
                        {"temperature": 60, "humidity" : 80}

                        预期 10s 后 IoT Core 会收到如下消息:

                          [{"device_id":"device_001","t_av":45,"t_count":2,"t_max":60,"t_min":30}]

                          实际操作:

                          向 baetyl-broker 发送两条消息。

                          IoTCore 查看:

                          如上图,符合预期。

                          此时观察端上应用的资源消耗:

                          可以看出流式处理引擎 eKuiper 只消耗了极小的内存和CPU。

                          通过本文,读者可以基于 Baetyl 边缘计算框架快速集成 eKuiper 流式处理引擎,快速搭建边缘侧的流式解决方案,灵活地开发出基于边缘数据分析的系统,实现数据的低时延、低成本和安全的处理。

                          点击"阅读原文" ,了解更多

                          ↓↓↓

                          文章转载自 EMQX,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论