写在前面

org.eclipse.paho.client.mqttv3:主要用于对外提供服务,即整个Eclipse Paho对外的窗口。
org.eclipse.paho.client.mqttv3.internal:提供了对mqttv3 中的接口的实现。
org.eclipse.paho.client.mqttv3.internal.nls: 国际化相关。点进去你会惊讶的发现messages_zh_CN.properties,没错,你没有看错,这货支持中文。不过学习源码时这个包不太重要,可以忽略
org.eclipse.paho.client.mqttv3.internal.security:MQTT支持SSL加密,这个包内实现了基于TLS协议的SSLSocket,这个配置起来还是有一些复杂度的,我会单独开一章详细讲。
org.eclipse.paho.client.mqttv3.internal.websocket:websocket相关实现
org.eclipse.paho.client.mqttv3.internal.wire:MQTT协议中报文信息,里面包含有心跳包、订阅包、发布包、确认包等
org.eclipse.paho.client.mqttv3.persist:发布信息持久化类。MQTT提供两种保持发布消息的方式,一种是将信息保持到文件中;一种是直接保持到内存中。
org.eclipse.paho.client.mqttv3.util:工具类。
org.eclipse.paho.client.mqttv3.logging:日志包
上面标红色的即为Eclipse paho的核心代码,所以只要把这些内容搞清楚,基本上它的核心内容和执行流程也就了解了。其实Eclipse paho还是算比较轻量级的,代码量也不算非常大。对于这种偏工具类的中间件,我个人的习惯是找到它核心的功能,顺着流程看。就Eclipse paho来说,它是对MQTTClient的实现,核心功能无非就是与服务端建立连接,发布、订阅消息。所以接下来的几个章节,我也会从这些角度来分析。对于代码的解释大部分都以注释的形式写在每行代码的前面,所以小伙伴们浏览时一定要仔细的看一看代码,避免有疏漏看不懂的地方。
PS:这里先把Eclipse Paho的核心类及其相关作用贴在前面,方便小伙伴们查阅
项目源码地址:https://github.com/eclipse/paho.mqtt.java

一、创建异步的客户端对象
MqttAsyncClient
public class MqttPahoMessageHandler extends AbstractMqttMessageHandlerimplements MqttCallback, ApplicationEventPublisherAware {/*** The default completion timeout in milliseconds.*/// 默认的发送完成等待超时时间public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;// 最长发送完成超时时间,默认30秒private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;@Overrideprotected void publish(String topic, Object mqttMessage, Message<?> message) {Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");try {// 在这里创建连接并且发送消息IMqttDeliveryToken token = checkConnection().publish(topic, (MqttMessage) mqttMessage);// 如果是非异步的,会阻塞等待一条消息发送完成(发送成功并且收到相应的回执,这个取决于qos)// 如果是异步的,会通过事件回调的方式t通知发送完成,非阻塞if (!this.async) {token.waitForCompletion(this.completionTimeout); // NOSONAR (sync)}else if (this.asyncEvents && this.applicationEventPublisher != null) {this.applicationEventPublisher.publishEvent(new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),getClientInstance()));}}catch (MqttException e) {throw new MessageHandlingException(message, "Failed to publish to MQTT in the [" + this + ']', e);}}}

IMqttDeliveryToken token = checkConnection().publish(topic, (MqttMessage) mqttMessage);
检查连接
发送消息
下面我们分别介绍
检查连接
废话不说,直接看代码
private synchronized IMqttAsyncClient checkConnection() throws MqttException {// 检查客户端对象是否是连接状态或client对象非空// 是 -> 将客户端关闭,释放资源if (this.client != null && !this.client.isConnected()) {this.client.setCallback(null);this.client.close();this.client = null;}// 如果客户端对象为空,获取异步客户端对象,并用创建好的连接对象进行连接操作if (this.client == null) {try {MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();Assert.state(this.getUrl() != null || connectionOptions.getServerURIs() != null,"If no 'url' provided, connectionOptions.getServerURIs() must not be null");// 通过客户端工厂调用MQTTAsyncClient的构造方法创建一个客户端实例this.client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());// 客户端实例计数器+1incrementClientInstance();// 设置回调this.client.setCallback(this);// 通过阻塞线程建立连接,保证连接建立成功后再执行其他操作this.client.connect(connectionOptions).waitForCompletion(this.completionTimeout);logger.debug("Client connected");}catch (MqttException e) {if (this.client != null) {this.client.close();this.client = null;}if (this.applicationEventPublisher != null) {this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));}throw new MessagingException("Failed to connect", e);}}return this.client;}
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence,MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {// Count characters, surrogate pairs count as one character.// clientId长度计算int clientIdLength = 0;for (int i = 0; i < clientId.length() - 1; i++) {if (Character_isHighSurrogate(clientId.charAt(i)))i++;clientIdLength++;}// 如果长度 > 65536 处理异常if (clientIdLength > 65535) {throw new IllegalArgumentException("ClientId longer than 65535 characters");}// 检查连接Broker URL的合法性NetworkModuleService.validateURI(serverURI);this.serverURI = serverURI;this.clientId = clientId;this.persistence = persistence;// 创建持久化对象,如果未设置,默认使用内存持久化if (this.persistence == null) {this.persistence = new MemoryPersistence();}this.executorService = executorService;// 创建持久化对象的容器 -> 即HashTablethis.persistence.open(clientId, serverURI);// 创建消息发送,接收处理器对象this.comms = new ClientComms(this, this.persistence, pingSender, this.executorService);// 清空内存持久容器数据this.persistence.close();this.topics = new Hashtable();}
这里要注意一下,Eclipse.paho有两种持久化策略
内存持久化
文件持久化
分别对应MqttClientPersistence接口的两个不同的实现MemoryPersistence和MqttDefaultFilePersistence,默认使用内存持久化进行存储,如果使用文件存储,可以显式指定。通过指定文件路径的方式设置文件存储位置。
我们再来看一下ClientComms初始化的过程
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender, ExecutorService executorService) throws MqttException {// 设置当前状态为未连接状态this.conState = DISCONNECTED;this.client = client;this.persistence = persistence;// 心跳包发送器,在创建MqttAsyncClient时创建的,类型为TimerPingSenderthis.pingSender = pingSender;// 心跳包发送器初始化this.pingSender.init(this);this.executorService = executorService;// 创建用于追踪发送消息的追踪器对象this.tokenStore = new CommsTokenStore(getClient().getClientId());// 创建回调对象this.callback = new CommsCallback(this);// 初始化消息状态处理对象。this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);callback.setClientState(clientState);}
这里有两个值得注意的地方
pingSender:Eclipse paho维护了自己的一套Client和 Broker探活的心跳机制,使用到了上述代码中提到的TimerPingSender,简单来说就是通过定时任务的方式不断的向Broker发送心跳包,如果在一段时间内不交互或者心跳无应答,即为异常,这是客户端会主动断开与Broker的连接。这个在后面的章节中会详细介绍,这里暂时不做展开
ClientState:保存正在发布的消息和将要发布的消息的状态信息,并对对应状态下的消息进行必要的处理,具体的处理方式会根据消息的质量(QOS)有所不同
在设置CommsCallback对象时,因为Eclipse paho集成了发布和订阅消息的功能,所以这里也会初始化两个队列用于存放接受消息的队列messageQueue和CompleteQueue。
protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore,CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException {// messageId使用记录映射inUseMsgIds = new Hashtable();// 特殊消息处理队列pendingFlows = new Vector();// qos2类型消息的出站队列outboundQoS2 = new Hashtable();// qos1类型消息的出站队列outboundQoS1 = new Hashtable();// qos0类型消息的出站队列outboundQoS0 = new Hashtable();// qos2类型消息的入站队列inboundQoS2 = new Hashtable();// 初始化PingREQ对象,用于发送心跳包pingCommand = new MqttPingReq();// 针对qos2的消息,用于记录发出Rel但未收到Comp响应的计数器inFlightPubRels = 0;// 实际正在发出或接受,但未完成(针对qos1或qos2)回执处理的消息计数actualInFlight = 0;this.persistence = persistence;this.callback = callback;this.tokenStore = tokenStore;this.clientComms = clientComms;this.pingSender = pingSender;// 加载持久化的数据.restoreState();}
这里有两个值得注意的地方
这里一共初始化了6个队列,消息会在不同的阶段在各个队列中进行流转,这里暂时先不展开,之后会有章节专门介绍发送和接受消息的内部实现,介是会针对这些队列的作用做详细的说明
在初始化方法最后,调用了restoreState()方法,这个方法的主要作用简单来说就是将持久化到内存或者文件的数据重新加载到出站队列并进行发送操作,这里只说一下为什么会有这种机制,内部实现同样的等介绍发送和接受消息章节统一来解释。之所以有这样的机制,是因为当消息的qos设置为1或2时,发布者和broker,broker和订阅者之间需要通过非publish消息进行回执通信,这时如果网络连接失败或者网络抖动导致未收到回执,需要客户端重新发送这些消息以保证消息的可达性。再比如我们在发送消息时,可能因为业务峰值产生大量消息,此时消息会挤压在队列中,等待被依次发出,这时如果出现网络断开的情况,同样需要在恢复网络时重新进行发送。所以基于这样的一套重发机制的考虑才会在这里出现这样的操作。
至此,MQTTAsyncClient客户端对象创建完成。
二、与Broker建立连接
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {// 判断连接状态,如果是已连接/正在连接/正在断开连接/已关闭这几种状态的话进行异常处理if (comms.isConnected()) {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);}if (comms.isConnecting()) {throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);}if (comms.isDisconnecting()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);}if (comms.isClosed()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);}// 获取连接信息对象,这个我们在配置配件中初始化过if (options == null) {options = new MqttConnectOptions();}this.connOpts = options;this.userContext = userContext;final boolean automaticReconnect = options.isAutomaticReconnect();// 根据不同的连接url创建网络模块comms.setNetworkModules(createNetworkModules(serverURI, options));comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));// Insert our own callback to iterate through the URIs till the connect// succeedsMqttToken userToken = new MqttToken(getClientId());ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken, userContext, callback, reconnecting);userToken.setActionCallback(connectActionListener);userToken.setUserContext(this);// If we are using the MqttCallbackExtended, set it on the// connectActionListenerif (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);}comms.setNetworkModuleIndex(0);// 通过监听器建立连接connectActionListener.connect();return userToken;}
这里我们来看一下创建网络模块的实现
protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options)throws MqttException, MqttSecurityException {NetworkModule[] networkModules = null;String[] serverURIs = options.getServerURIs();String[] array = null;if (serverURIs == null) {array = new String[] { address };} else if (serverURIs.length == 0) {array = new String[] { address };} else {array = serverURIs;}// 根据传入的url集合长度建立网络模块集合networkModules = new NetworkModule[array.length];for (int i = 0; i < array.length; i++) {// 取出对应的url和options(连接设置),进一步创建networkModules[i] = createNetworkModule(array[i], options);}return networkModules;}
我们继续看
// FACTORY_SERVICE_LOADER对应NetworkModuleFactory.class和NetworkModuleService.class.getClassLoader()的集合private static final ServiceLoader<NetworkModuleFactory> FACTORY_SERVICE_LOADER = ServiceLoader.load(NetworkModuleFactory.class, NetworkModuleService.class.getClassLoader());public static NetworkModule createInstance(String address, MqttConnectOptions options, String clientId)throws MqttException, IllegalArgumentException{try {URI brokerUri = new URI(address);applyRFC3986AuthorityPatch(brokerUri);// 获取到url的schemeString scheme = brokerUri.getScheme().toLowerCase();// 通过scheme匹配不同的NetWorkModule实现创建不同的网络模块for (NetworkModuleFactory factory : FACTORY_SERVICE_LOADER) {// 获取scheme,TLS对应ssl://,TCP对应tcp://if (factory.getSupportedUriSchemes().contains(scheme)) {return factory.createNetworkModule(brokerUri, options, clientId);}}/** To throw an IllegalArgumentException exception matches the previous behavior of* MqttConnectOptions.validateURI(String), but it would be nice to provide something more meaningful.*/throw new IllegalArgumentException(brokerUri.toString());} catch (URISyntaxException e) {throw new IllegalArgumentException(address, e);}}
创建网络模块这里看着比较混乱,我来简单总结一下。
String[] hostArray = new String[]{"ssl://127.0.0.1:8883","tcp://127.0.0.1:1883"};mqttConnectOptions.setServerURIs(hostArray);
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {synchronized (conLock) {if (isDisconnected() && !closePending) {// 将连接状态置为CONNECTINGconState = CONNECTING;// 首次连接mqtt时,发送Connect控制报文。conOptions = options;MqttConnect connect = new MqttConnect(client.getClientId(),conOptions.getMqttVersion(),conOptions.isCleanSession(),conOptions.getKeepAliveInterval(),conOptions.getUserName(),conOptions.getPassword(),conOptions.getWillMessage(),conOptions.getWillDestination());this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());this.clientState.setCleanSession(conOptions.isCleanSession());this.clientState.setMaxInflight(conOptions.getMaxInflight());tokenStore.open();// 创建一个后台线程进行连接操作ConnectBG conbg = new ConnectBG(this, token, connect, executorService);// 线程开启conbg.start();}else {if (isClosed() || closePending) {throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);} else if (isConnecting()) {throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);} else if (isDisconnecting()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);} else {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);}}}}
上面的代码中,我们需要注意两点
虽然是通过监听器connectActionListener创建连接,但是建立连接的过程最终还是会交给ClientComms处理
当与Broker建立连接时,需要发送一条控制报文为CONNECT类型的消息,如果连接成功,Broker会回执一条控制报文为CONNACK的消息,通知客户端已经与服务端成功建立连接。在Eclipse.paho中,MQTT官方支持的报文都被封装成对象,在发送和订阅消息时,根据一定的规则转换成对应的对象进行处理,非常的友好,下图是所有控制报文对应的消息类。

所以在这里会创建一个MqttConnect类型的消息用于建立连接。
下面是在后台线程中建立连接的过程
public void run() {Thread.currentThread().setName(threadName);MqttException mqttEx = null;try {// Reset an exception on existing delivery tokens.// This will have been set if disconnect occurred before delivery was// fully processed.MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens();for (MqttDeliveryToken tok : toks) {tok.internalTok.setException(null);}// Save the connect token in tokenStore as failure can occur before sendtokenStore.saveToken(conToken,conPacket);// Connect to the server at the network level e.g. TCP socket and then// start the background processing threads before sending the connect// packet.// 取出不同的网络模块并建立通信NetworkModule networkModule = networkModules[networkModuleIndex];networkModule.start();// 创建线程用于消息接受receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());// 线程执行receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);// 创建线程用于消息发送sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());// 线程执行sender.start("MQTT Snd: "+getClient().getClientId(), executorService);// 执行回调线程callback.start("MQTT Call: "+getClient().getClientId(), executorService);// 发送Conn消息internalSend(conPacket, conToken);} catch (MqttException ex) {mqttEx = ex;} catch (Exception ex) {mqttEx = ExceptionHelper.createMqttException(ex);}// 如果有异常,断开连接if (mqttEx != null) {shutdownConnection(conToken, mqttEx);}}
上述代码需要注意下面几点
创建网络模块时,根据不同的协议创建不同的网络环境,以tcp为例,底层用的是socket通信,代码如下
public void start() throws IOException, MqttException {try {SocketAddress sockaddr = new InetSocketAddress(host, port);// 创建socket对象socket = factory.createSocket();// 通过socket进行连接socket.connect(sockaddr, conTimeout*1000);socket.setSoTimeout(1000);}catch (ConnectException ex) {throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);}}
发送connect消息的过程在这里不展开讲,小伙伴们只要知道这里发送了一条CONNECT类型消息即可,具体的发布/订阅内部实现我们留到下一章节详细讲解
Connect消息发送后,如果连接成功,会有一条connack类型的回执消息。这里我们也不展开讲,只是简单的看一下在订阅到connack消息后Eclipse paho又做了些什么,具体订阅的内部实现我们再次也不展开。在CommReceiver中封装了订阅消息的处理逻辑,并以后台线程的形式一直在获取订阅的数据,下面我们看下收到ack的代码
protected void notifyReceivedAck(MqttAck ack) throws MqttException {if (token == null) {// ....} else if (ack instanceof MqttConnack) {// 收到connack类型的消息,如果返回码是0,即为成功。int rc = ((MqttConnack) ack).getReturnCode();if (rc == 0) {synchronized (queueLock) {if (cleanSession) {clearState();// Add the connect token back in so that users can be// notified when connect completes.tokenStore.saveToken(token,ack);}// 初始化计数器为0inFlightPubRels = 0;actualInFlight = 0;// 将出站数据通过类型不同划分到发送队列和特殊消息队列中进行等待发送restoreInflightMessages();// 通知心跳发送器向broker发送心跳,并监控broker活跃connected();}} else {mex = ExceptionHelper.createMqttException(rc);throw mex;}clientComms.connectComplete((MqttConnack) ack, mex);notifyResult(ack, token, mex);tokenStore.removeToken(ack);// Notify the sender thread that there maybe work for it to do nowsynchronized (queueLock) {queueLock.notifyAll();}} else {notifyResult(ack, token, mex);releaseMessageId(ack.getMessageId());tokenStore.removeToken(ack);}checkQuiesceLock();}
上述代码需要注意
出站队列作为消息的中间媒介最终都会将消息转移到待发送队列进行发送操作,这里具体不展开,只需要了解在建立连接时会把出站队列中的消息放到待发送队列和特殊消息队列中即可。
连接建立后,为了保证Client和Broker的连接状态活跃,需要定期发送控制报文为PINGREQ的消息,如果收到PINGRESP。即证明服务器与客户端连接正常。这里Eclipse paho维护了一套自己的心跳探活机制。我们同样也留在下一个章节去详细了解。
至此,整个Eclipse paho作为客户端建立与Broker的过程就已经完成了。
我们回顾下整个流程
初始化MqttAsyncClient实例
验证连接URL合法性
初始化持久化存储对象,默认使用内存持久化
初始化ClientComms实例
初始化CommsTokenStore实例
初始化CommsCallBack实例
初始化ClientState实例
初始化对应队列
加载持久化存储中的数据,并且放到出站队列outboundqosX中
与服务器建立连接
初始化网络模块,根据不同的url对应的scheme加载不同的网络模块
通过ClientComms建立连接
创建MqttConn对象
开启后台线程
根据NeworkModule建立不同的网络环境,以tcp为例,建立socket连接
开启线程处理发送和订阅消息
发送MqttConn消息
接受到MqttConnack消息
将出站队列outboundqosX中的消息,待发送队列pendingMessage和特殊消息队列pendingFlows中
消息的发布、订阅内部实现
心跳机制与断线重连机制
消息在各个队列间是如何流转的
小伙伴们敬请期待吧~~~









