Spring Boot集成MQTT
支持动态创建topic
。
添加依赖
<!-- MQTT --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
添加配置文件
server:port: 9999#mqtt的配置mqtt:server:url: tcp://ip:1883port: 1883username: 用户名password: 密码client:consumerId: consumerCopublishId: publishCodefault:topic: topiccompletionTimeout: 3000
MQTT配置文件
package net.xiangcaowuyu.mqtt.config;import net.xiangcaowuyu.mqtt.utils.MqttReceiveHandle;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import org.springframework.util.StringUtils;import javax.annotation.Resource;import java.util.Arrays;import java.util.List;/*** Description:消息订阅配置** @author : laughing* DateTime: 2021-05-18 13:31*/@Configurationpublic class MqttConfig {public final Logger logger = LoggerFactory.getLogger(this.getClass());private static final byte[] WILL_DATA;static {WILL_DATA = "offline".getBytes();}@Resourceprivate MqttReceiveHandle mqttReceiveHandle;@Value("${mqtt.server.url}")private final String url = "tcp://139.198.172.114:1883";@Value("${mqtt.server.port}")private final String port = "1883";@Value("${mqtt.server.username}")private final String username = "admin";@Value("${mqtt.server.password}")private final String password = "public";@Value("${mqtt.client.consumerId}")private final String consumerId = "consumerClient";@Value("${mqtt.client.publishId}")private final String publishId = "publishClient";@Value("${mqtt.default.topic}")private final String topic = "topic";@Value("${mqtt.default.completionTimeout}")private final Integer completionTimeout = 3000;//消息驱动private MqttPahoMessageDrivenChannelAdapter adapter;//订阅的主题列表private String listenTopics = "";// //mqtt消息接收接口// private MqttReceiveService mqttReceiveService;//// public void setMqttReceiveService(MqttReceiveService mqttReceiveService){// this.mqttReceiveService = mqttReceiveService;// }/*** MQTT连接器选项* **/@Bean(value = "getMqttConnectOptions")public MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接mqttConnectOptions.setCleanSession(true);// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{url});// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(10);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);return mqttConnectOptions;}/*** MQTT工厂* **/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*** MQTT信息通道(生产者)* **/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT消息处理器(生产者)* **/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(publishId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(topic);return messageHandler;}/*** 配置client,监听的topic* MQTT消息订阅绑定(消费者)* **/@Beanpublic MessageProducer inbound() {if(adapter == null){adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),topic);}String [] topics = listenTopics.split(",");for(String topic: topics){if(!StringUtils.isEmpty(topic)){adapter.addTopic(topic,1);}}adapter.setCompletionTimeout(completionTimeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 增加监听的topic* @param topicArr 消息列表* @return 结果*/public List<String> addListenTopic(String [] topicArr){if(adapter == null){adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),topic);}List<String> listTopic = Arrays.asList(adapter.getTopic());for(String topic: topicArr){if(!StringUtils.isEmpty(topic)){if(!listTopic.contains(topic)){adapter.addTopic(topic,1);}}}return Arrays.asList(adapter.getTopic());}/*** 移除一个监听的topic* @param topic* @return*/public List<String> removeListenTopic(String topic){if(adapter == null){adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),topic);}List<String> listTopic = Arrays.asList(adapter.getTopic());if(listTopic.contains(topic)){adapter.removeTopic(topic);}return Arrays.asList(adapter.getTopic());}/*** MQTT信息通道(消费者)* **/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)* **/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//处理接收消息mqttReceiveHandle.handle(message);//String topic = message.getHeaders().get("mqtt_receivedTopic").toString();//String msg = ((String) message.getPayload()).toString();//mqttReceiveService.handlerMqttMessage(topic,msg);}};}}
消息处理
package net.xiangcaowuyu.mqtt.utils;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;import java.util.Date;@Componentpublic class MqttReceiveHandle implements MqttCallback {private final Logger logger = LoggerFactory.getLogger(MqttReceiveHandle.class);public void handle(Message<?> message) {try {logger.info("{},客户端号:{},主题:{},QOS:{},消息接收到的数据:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),message.getHeaders().get(MqttHeaders.ID),message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),message.getHeaders().get(MqttHeaders.RECEIVED_QOS),message.getPayload());//处理mqtt数据this.handle(message.getPayload().toString());} catch (Exception e) {e.printStackTrace();logger.error("处理错误" + e.getMessage());}}private void handle(String str) throws Exception {logger.info(str);}@Overridepublic void connectionLost(Throwable throwable) {logger.warn("连接丢失");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("消息到达:" + topic + "\n" + "消息内容:" + new String(mqttMessage.getPayload()) + "\nclientId:" + mqttMessage.getId());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("clientId:" + iMqttDeliveryToken.getClient().getClientId());}}
消息发送
package net.xiangcaowuyu.mqtt.utils;import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;/*** Description:** @author : laughing* DateTime: 2021-05-18 13:44*/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param qos 对消息处理的几种机制。* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。* 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);}
测试
package net.xiangcaowuyu.mqtt.controller;import net.xiangcaowuyu.mqtt.config.MqttConfig;import net.xiangcaowuyu.mqtt.utils.MqttGateway;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.util.List;@RestControllerpublic class MqttController {@Resourceprivate MqttGateway mqttGateway;@Resourceprivate MqttConfig mqttConfig;@GetMapping("/add/{topic}")public String addTopic(@PathVariable("topic") String topic) {String[] topics = {topic};List<String> list = mqttConfig.addListenTopic(topics);return list.toString();}@GetMapping("/pub")public String pubTopic() {String topic = "temperature1";String msg = "client msg at: " + String.valueOf(System.currentTimeMillis());mqttGateway.sendToMqtt(topic, 2, msg);return "OK";}@GetMapping("/del/{topic}")public String delTopic(@PathVariable("topic") String topic) {List<String> list = mqttConfig.removeListenTopic(topic);return list.toString();}}

文章转载自香草物语博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




