暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

突破连接边界:EMQX 实现 MQTT 和 NATS 协议双向互通

EMQX 2025-07-15
332

在当今高度互联的数字化世界中,实时数据流的复杂性日益增长,尤其是在物联网(IoT)和微服务架构领域。企业和开发者面临着一个普遍的挑战:各种数据协议和系统往往各自为政,形成难以逾越的「数据孤岛」。这种碎片化的局面不仅增加了开发和维护的巨大开销,还阻碍了对数据潜力的全面发掘和利用,导致关键业务洞察和实时决策的缺失。

作为服务于物联网实时智能的统一 MQ + AI 平台,EMQX 致力于提供高效可靠的物联网连接。自 5.0 版本起,EMQX 便引入了强大的协议网关特性,旨在打破传统 MQTT 协议的边界,使其能够接收来自其他非 MQTT 协议的客户端连接。这一创新为 EMQX 赋予了卓越的多协议接入能力,使其成为一个真正意义上的统一消息平台,能够无缝集成各种异构系统和设备。

随着最新版本 EMQX 5.10.0 的发布,EMQX 协议网关家族又迎来了一位新成员:EMQX NATS 网关。这项新功能进一步扩展了 EMQX 的连接边界,实现了 MQTT 与 NATS 协议之间的原生、双向互通,为构建更灵活、更强大的实时数据基础设施提供了前所未有的可能性。



什么是 NATS 协议


NATS(Neural Autonomic Transport System)是一个高性能、轻量级、云原生的消息系统,专为现代分布式应用设计。它以其简洁、高效的特点而闻名,支持发布-订阅(Publish-Subscribe)、请求-响应(Request-Reply)等多种消息模式,并提供了丰富的客户端库,覆盖多种编程语言。

NATS 的核心特点包括:

  • 高性能与低延迟: NATS 采用轻量级协议和优化的路由机制,确保消息以极高的吞吐量和低延迟进行传输。Core NATS 提供「至多一次」(At-most-once)的消息传递语义,适用于对速度和可用性要求极高的场景。

  • 云原生设计: NATS 从设计之初就考虑了云环境的特性,易于部署在裸机、虚拟机、容器或 Kubernetes 等任何环境中,并支持集群化部署以实现高可用性和可扩展性。

  • 简洁性: NATS 协议简单,客户端库易于使用,降低了开发和运维的复杂性。

  • 主题寻址: NATS 基于主题(Subject)进行消息路由,并支持单层和多层的主题通配符,这使得 M:N(多对多)通信变得轻松。

尽管 NATS 和 MQTT 在各自领域都表现出色,但它们之间存在协议差异,传统上需要复杂的定制桥接才能实现互通。EMQX NATS 网关的出现,正是为了弥合这一鸿沟。它打通了 NATS 和 MQTT 协议,使得 IoT 设备(通常使用 MQTT)能够与后端微服务(通常使用 NATS)无缝共享数据,从而打破数据孤岛,为构建更全面、更具洞察力的应用提供了无限可能。

这种集成不仅简化了系统架构,还为企业带来了前所未有的灵活性,使其能够根据具体需求选择最适合的协议,同时确保所有组件之间的无缝通信。



EMQX NATS 网关的快速配置与启动


本节将为您提供一个快速指南,介绍如何安装最新版本的 EMQX 5.10.0,以及如何配置和使用 EMQX NATS 网关。

   安装 EMQX 5.10.0

首先,您需要安装 EMQX 5.10.0。您可以从 EMQX 官方下载页面 获取适用于您操作系统的安装包(例如 Debian 或 macOS 等)。

此处,以 Docker 为例:

    docker run --name emqx \
     -p 18083:18083 -p 1883:1883 -p 20243:20243 \
     -d emqx/emqx-enterprise:5.10.0
    启动成功后,您可以通过访问 http://localhost:18083/ 进入 EMQX Dashboard,默认用户名密码为 admin
    /public。
       开启和配置 NATS 网关

    EMQX NATS 网关的配置非常灵活,可以通过 Dashboard 或配置文件进行:

    1. 登录 EMQX Dashboard。

    2. 在左侧导航栏中,点击管理 -> 网关

    3. 找到 NATS 网关,点击配置。

    4. 进入基础参数配置,保持默认即可。

    其中:

    • 挂载点:为所有 NATS 客户端发布/订阅的主题设置一个固定前缀。此处为空,表示不设置任何前缀。

    • 默认心跳间隔:配置 NATS 网关向客户端发送心跳的间隔时间。

    • 心跳超时阈值:即网关等待心跳的最大超时时间。此处为 5 秒,即 5 秒后未收到客户端的心跳应答,即认为客户端已断线。

    5. 点击下一步,进入到监听器配置页面,点击添加监听器。配置监听器名称为 default
     监听地址为 20243
     端口,点击添加完成监听器配置。

    6. 设置完成后,点击启用即完成 NATS 网关的配置和启动。



    使用演示:通过 Python 客户端代码实现 NATS 与 MQTT 消息互通


    本节将通过 Python 客户端代码示例,演示如何连接 NATS 网关,实现 NATS 客户端与 MQTT 客户端之间的双向消息互通。

    首先,确保您已安装 Python 环境,并安装了 NATS 和 Paho MQTT 客户端库:

      pip install nats-py paho-mqtt

      我们将演示以下两种情况:

      1. NATS 客户端发布消息,MQTT 客户端订阅并接收。

      2. MQTT 客户端发布消息,NATS 客户端订阅并接收。

         NATS 客户端发布,MQTT 客户端接收

      nats_publisher.py
       
      此脚本连接到 EMQX NATS 网关

      并向 
      iot.sensor.data.temperature

       Subject 发布消息:

        import asyncio
        import nats
        async def run():
            nc = await nats.connect(servers=["nats://localhost:20243"])
            print("NATS Publisher connected to EMQX NATS Gateway.")
            subject = "sensor.data.temperature"
            message = b'{"device_id": "sensor_001", "temp": 25.5}'
            await nc.publish(subject, message)
            print(f"Published NATS message to subject '{subject}': {message.decode()}")
            await nc.drain()
            print("NATS Publisher disconnected.")
        if __name__ == '__main__':
            asyncio.run(run())

        mqtt_subscriber.py

         :

        此脚本连接到 EMQX MQTT 监听器,并订阅映射后的 MQTT 主题 sensor/data/temperature

         。

          import paho.mqtt.client as paho
          from paho import mqtt
          import time
          # MQTT 消息回调函数
          def on_message(client, userdata, msg):
              print(f"Received MQTT message on topic '{msg.topic}': {msg.payload.decode()}")
          def run():
              client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
              client.on_message = on_message
              # 连接到 EMQX MQTT 监听器 (默认端口 1883)
              client.connect("localhost"188360)
              print("MQTT Subscriber connected to EMQX.")
              # 订阅映射后的 MQTT 主题
              # 根据 NATS Gateway 的 topic_mapping 规则,iot.sensor.data.temperature 映射到 sensor/data/temperature
              client.subscribe("sensor/data/temperature", qos=1)
              print("MQTT Subscriber subscribed to 'sensor/data/temperature'.")
              client.loop_forever()
          if __name__ == '__main__':
              run()

          运行步骤:

          1. 首先运行 mqtt_subscriber.py


          2. 然后运行 nats_publisher.py

          。 您将看到 mqtt_subscriber.py



           接收到 NATS 客户端发布的消息。

             MQTT 客户端发布,NATS 客户端接收
          mqtt_publisher.py


           :此脚本连接到 EMQX MQTT 监听器
          并向 command/device/light_001

           主题发布消息。
            import paho.mqtt.client as paho
            from paho import mqtt
            import time
            def run():
                client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
                # 连接到 EMQX MQTT 监听器 (默认端口 1883)
                client.connect("localhost"188360)
                print("MQTT Publisher connected to EMQX.")
                topic = "command/device/light_001"
                message = '{"action": "turn_on", "brightness": 80}'
                client.publish(topic, message, qos=1)
                print(f"Published MQTT message to topic '{topic}': {message}")
                client.disconnect()
                print("MQTT Publisher disconnected.")
            if __name__ == '__main__':
                run()
            nats_subscriber.py


             :此脚本连接到 EMQX NATS 网关,并订阅映射后的 NATS Subject command.device.light_001

             。
              import asyncio
              import nats
              async def message_handler(msg):
                  print(f"Received NATS message on subject '{msg.subject}': {msg.data.decode()}")
              async def run():
                  # 连接到 EMQX NATS Gateway
                  nc = await nats.connect(servers=["nats://localhost:20243"])
                  print("NATS Subscriber connected to EMQX NATS Gateway.")
                  # 订阅映射后的 NATS Subject
                  # command/device/light_001 映射到 command.device.light_001
                  await nc.subscribe("command.device.light_001", cb=message_handler)
                  print("NATS Subscriber subscribed to 'device.command.light_001'.")
                  # 保持连接,等待消息
                  try:
                      while True:
                          await asyncio.sleep(1)
                  except asyncio.CancelledError:
                      pass
                  finally:
                      await nc.drain()
                      print("NATS Subscriber disconnected.")
              if __name__ == '__main__':
                  asyncio.run(run())

              运行步骤:

              1. 首先运行 nats_subscriber.py



              2. 然后运行 mqtt_publisher.py


              。您将看到 nats_subscriber.py


               接收到 MQTT 客户端发布的消息。

              通过这些简单的示例,您可以看到 EMQX NATS 网关无缝地在 MQTT 和 NATS 协议之间转换和转发消息,极大简化了异构系统间的集成工作。



              总结


              EMQX 5.10.0 NATS 网关的发布,是 EMQX 在构建统一、灵活的实时数据基础设施方面迈出的又一重要步伐。它通过提供 MQTT 和 NATS 协议之间的原生、双向互通能力,有效地打破了实时通信领域长期存在的协议壁垒,为构建更加互联互通、灵活高效的分布式系统奠定了坚实基础。

              这项创新不仅显著简化了复杂的集成挑战,消除了对定制桥接或独立消息中间件的需求,从而降低了开发和运营成本,更开启了物联网、微服务和实时控制等领域应用的新篇章。无论是智能工厂中的传感器数据流向云端微服务进行实时分析,还是后端控制系统向边缘设备发送指令,EMQX NATS 网关都能够确保数据在不同协议生态系统之间自由、高效地流动。

              点击阅读原文,咨询更多产品内容


              点击「阅读原文」了解更多








              文章转载自EMQX,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论