如果您的应用程序处理来自设备的时间序列数据并提供有关它们的实时丰富见解,您可能会遇到以下应用程序的可扩展性要求:
- 支持高吞吐量和低延迟,同时从数千到数百万个 IoT 设备摄取数十亿个指标。
- 支持亚秒级响应时间和高并发(100 名用户),同时支持实时仪表板。
人们普遍认为只有 NoSQL 数据库才能提供这种规模。 Azure 上的 Citus 使您能够在 PostgreSQL 生态系统中构建大型 IoT 应用程序,从而打破了这些障碍。
- 由于 Postgres,您的 IoT 应用程序获得了丰富的 SQL 接口,支持更新、删除、过滤、连接、聚合、扩展等,以及数据一致性和可靠性保证。
- 由于跨多个服务器分布表的超能力和无缝横向扩展的能力,您的应用程序随着它的发展获得所需的性能和可扩展性。您可以从单个节点开始,然后无缝扩展到多个节点。
本博客将介绍用于构建可扩展 IoT 应用程序的端到端参考架构。 azure 上的 Postgres 托管服务中的超大规模 (Citus)(也称为 Citus on Azure)是首选数据库。该博客还涵盖了您在构建 IoT 应用程序时可以牢记的某些最佳实践和注意事项。

Azure 上 IOT 工作负载的参考架构
好的,让我们开始吧。 首先,让我们对各种工具和服务进行高级演练,这些工具和服务共同创建了一个参考架构,使您能够构建一个可扩展的 IoT 应用程序。
该参考架构的主要组件包括:
- 使用 Azure IoT Hub 将您的设备连接到云
- 使用 Azure Databricks 实时处理和引入设备数据
- 在 Azure 上使用 Citus 存储和查询时间序列 IoT 数据
- 使用 Power BI 和 Grafana 对 IoT 数据进行交互式可视化

图 1:使用 Postgres、Citus 和 Azure 构建可扩展 IoT 应用程序的参考架构。该架构描述了来自设备的 IoT 数据如何通过 Azure IoT Hub 进入云并在 Databricks 中进行处理,然后通过 Azure 上的 Citus 被摄取到 Postgres 的端到端流程。带有 Citus 的 Postgres 是可扩展的关系数据库,用于存储设备数据并通过 Grafana 或 Power BI(以及 Azure Functions)提供实时仪表板。这篇文章的重点是所有这些部分如何结合在一起,使您能够在 Postgres 生态系统中构建大规模的 IoT 应用程序。
使用 Azure IoT Hub 将您的设备连接到云
Azure IoT Hub 是 Azure 中的一项托管服务,它充当中央消息中心,用于 IoT 应用程序和设备之间的双向通信。您可以使用 Azure IoT 中心构建 IoT 解决方案,在您的 IoT 设备和云托管解决方案后端之间进行可靠且安全的通信。您几乎可以将任何设备连接到 IoT 中心。
我们创建了一个 GitHub 存储库,可让您生成数千台在 IoT 中心注册并大规模发送消息的设备。此存储库使您能够大规模模拟设备数据并测试 IoT 中心。在构建 IoT 应用程序时,您还可以将脚本作为 CI/CD 管道的一部分插入此存储库中。
使用 Azure Databricks 实时处理和引入设备数据
Azure Databricks 可用于处理和摄取来自 Azure IoT 中心的设备数据。 Azure Databricks 是具有 Kafka 兼容性的容错流处理引擎,可用于连续处理。您可以使用 Spark Structured Streaming 进行数据的实时摄取和微批处理。
以下代码片段显示 Databricks 从 IoT 中心获取设备数据、对其进行处理并将其摄取到 Azure 上的超大规模 (Citus) 中。
- 以下 Scala 代码片段将侦听来自 EventHub(或 IoT 中心)设备主题的流,并创建 spark DataFrame 以进行进一步转换。记下导入库 kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule 此模块默认安装在 Databricks 集群上。
import org.apache.spark.sql.functions._
val EventHubs_ConnectionString = "Endpoint=sb://***.servicebus.windows.net/;SharedAccessKeyName=twitter;SharedAccessKey= <shared_access_key>";
val constring = "\"$ConnectionString\"";
val splchar = "\"";
val EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=" + "" + constring + "" + " password=" + splchar + EventHubs_ConnectionString + splchar + ";";
val df = spark
.readStream
.format("kafka") .option("subscribe", "<your topic>")
.option("kafka.bootstrap.servers", "<iot_hub_name>.servicebus.windows.net:9093")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", "$Default")
.option("failOnDataLoss", "true")
.load() //Dataframe is loaded untill here
- 以下 Scala 代码片段会将转换后的 Spark DataFrame 写入 Hyperscale (Citus)
//Create your postgresql configurations
def postgresqlSinkOptions: Map[String, String] = Map(
"dbtable" -> "public.devicedata", // table
"user" -> "citus", // Database username
"password" -> "<database password>", // Password
"driver" -> "org.postgresql.Driver",
"url" -> "jdbc:postgresql://<citus_server_group_name>.postgres.database.azure.com:5432/postgres",
"sslmode" -> "require"
)
//Writing device data to Citus/Postgres
df.writeStream
.foreachBatch { (batchdf: DataFrame, _: Long) =>
batchdf.write
.format("jdbc")
.options(postgresqlSinkOptions)
.mode("Append")
.save()
}
.start()
.awaitTermination()
使用 Postgres COPY 命令进行超快速数据摄取
上面的代码触发了 Postgres INSERT 命令,用于批量摄取数据。使用批量 INSERT,您可以预期每秒摄取几千行的吞吐量。但是,如果您想要更大的吞吐量,可以使用 Postgres COPY 命令。
COPY 命令可让您定期(可以低至每 30 秒)对行进行微批量处理,并近乎实时地摄取数据。使用 COPY,我们的一些客户已经看到每秒摄取数百万行的吞吐量。然而,吞吐量取决于数据模型(行宽)和硬件配置。
注意:Databricks 的基于 JDBC 的 Postgres 驱动程序本身不支持 COPY 命令。因此,我们编写了一个简单的 scala 包装器,将 COPY 命令功能扩展到 Databricks。
在 Azure 上使用 Citus 存储和查询时间序列 IoT 数据
Citus 是 PostgreSQL 扩展的,具有分布式表的超能力。这种超能力使您能够构建高度可扩展的关系应用程序。您可以开始在单个节点服务器组上构建应用程序,就像使用 PostgreSQL 一样。随着应用程序的可扩展性和性能要求的增长,您可以通过透明地分布表来无缝地扩展到多个节点。
以下是您在使用 Hyperscale (Citus) 构建应用程序时可以遵循的一些最佳实践:
- 分布列:要分布你的表,你需要选择一个列(a.k.a shard key)来决定数据如何在节点之间分布。分片键是核心部分,为您的数据添加了自然维度。在 IoT 应用程序中,大多数时候分布列是您的设备的标识符(例如 device_id)。可以在此链接中找到有关分布式数据建模的更多信息。
- 对时间序列数据进行分区:由于设备数据都是时间序列并且具有时间维度,因此请根据时间对表进行分区。超大规模 (Citus) 提供时间序列功能来创建和管理分区。
- JSONB 存储设备数据:您可以使用 JSONB 数据类型的列来存储和索引半结构化数据。设备遥测数据通常不是结构化的,每种设备类型都有自己独特的指标。
- 用于地理空间分析的 PostGIS:如果您的 IoT 应用程序需要地理空间功能,您可以使用 Hyperscale (Citus) 原生支持的 PostGIS 扩展。
- 用于快速分析的汇总(又名预聚合):您可以使用汇总来预聚合原始数据并避免查询的重复计算。您可以根据仪表板中的缩放级别(基于时间)将原始数据汇总为每分钟、每小时或每天的聚合。
使用 Power BI 和 Grafana 对 IoT 数据进行交互式可视化
具有实时流式传输的 Power BI 可让您实时流式传输数据和更新仪表板。在 Power BI 中创建的任何视觉对象或仪表板都可以显示和更新实时数据和视觉对象。如上述参考架构所示,您可以使用无服务器 Azure 函数定期查询 Hyperscale (Citus) 中的设备数据,并将其发布到 Power BI 以进行实时可视化。
Grafana 可用于构建时间序列仪表板或基于时间尺度显示事件日志(操作日志)指标。默认情况下,Grafana 带有 PostgreSQL 集成。对于云上的托管 Grafana,您可以使用最近推出预览版的 Azure Managed Grafana 服务。
开始使用 Azure 上的 Postgres 和 Citus 构建您自己的 IoT 应用程序
要在 5 分钟内亲身体验 Citus,您可以浏览此 Azure 快速入门。由于 Citus 是完全开源的,您可以在本地机器上轻松下载和测试 Citus。希望您发现该博客有用,分享了一些与使用 Citus 构建时间序列应用程序相关的信息链接。
- 使用 Citus 构建时间序列应用程序的用例指南
- 如何使用 Citus 为时间序列数据扩展 Postgres?
- 何时使用超大规模 (Citus)?
- 使用 Citus 高效分发 Postgres – 如何选择正确的分片键?
原文标题:Building IoT apps using Postgres, Citus, and Azure
原文作者:Manjunath Suryanarayana
原文地址:https://techcommunity.microsoft.com/t5/azure-database-for-postgresql/building-iot-apps-using-postgres-citus-and-azure/ba-p/3501175




