写在前面
通过上一个章节MQTT之Eclipse.Paho源码(一)--建立连接的介绍,我们已经将客户端和服务端建立了连接,本章我们就来实际看一下Eclipse.Paho是如何帮助我们进行生产和消费消息的。
消息发送
1.1构建消息并入队
IMqttDeliveryToken token = checkConnection().publish(topic, (MqttMssage) mqttMessage);
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {final String methodName = "sendNoWait";// 判断当前的连接状态,如果连接正常或者未连接,但是控制报文消息是Connect类型,或者// 正在断开连接且报文是DISCONNECT类型则进入条件语句中处理if (isConnected() ||(!isConnected() && message instanceof MqttConnect) ||(isDisconnecting() && message instanceof MqttDisconnect)) {// 如果disconnect缓冲区被打开且缓冲区中有消息,同步到缓冲区红,如果持久化策略被打开,同时做持久化操作if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){if(disconnectedMessageBuffer.isPersistBuffer()){this.clientState.persistBufferedMessage(message);}disconnectedMessageBuffer.putMessage(message, token);} else {// 若disconnectedMessageBuffer未打开,则证明连接正常,直接发送消息this.internalSend(message, token);}} else if(disconnectedMessageBuffer != null) {if(disconnectedMessageBuffer.isPersistBuffer()){this.clientState.persistBufferedMessage(message);}disconnectedMessageBuffer.putMessage(message, token);} else {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);}}
下面来看一下发送的入口方法,也就是ClientState.send()里面的相关代码。
public void send(MqttWireMessage message, MqttToken token) throws MqttException {// 为发送的消息设置MessageId,如果qos=0,则不需要进行messageId的设置if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {if(message instanceof MqttPublish && (((MqttPublish) message).getMessage().getQos() != 0)){message.setMessageId(getNextMessageId());}else if(message instanceof MqttPubAck ||message instanceof MqttPubRec ||message instanceof MqttPubRel ||message instanceof MqttPubComp ||message instanceof MqttSubscribe ||message instanceof MqttSuback ||message instanceof MqttUnsubscribe ||message instanceof MqttUnsubAck){message.setMessageId(getNextMessageId());}}// 将message和token绑定 ,并且设置token的MessageIdif (token != null) {message.setToken(token);try {token.internalTok.setMessageID(message.getMessageId());} catch (Exception e) {}}// 根据消息的qos,将消息分发到不同的队列中if (message instanceof MqttPublish) { // 只有Publish类型的消息才需要进队列发送。synchronized (queueLock) {// 这里会对正在飞行的消息(消息发送出去但是还未收到ack,即未完成的消息)// 做一个限制,如果实际为着陆的消息>=设置的上线,则被拒绝并抛出异常if (actualInFlight >= this.maxInflight)throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);}MqttMessage innerMessage = ((MqttPublish) message).getMessage();switch(innerMessage.getQos()) {case 2:outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);persistence.put(getSendPersistenceKey(message), (MqttPublish) message);tokenStore.saveToken(token, message);break;case 1:outboundQoS1.put( Integer.valueOf(message.getMessageId()), message);persistence.put(getSendPersistenceKey(message), (MqttPublish) message);tokenStore.saveToken(token, message);break;}// 最后统一添加到待发送队列中pendingMessages.addElement(message);// 唤醒发送线程进行消息发送。queueLock.notifyAll();}} else {if (message instanceof MqttConnect) {// 如果消息是CONNECT类型,则进入到下面处理synchronized (queueLock) {tokenStore.saveToken(token, message);// 添加到特殊消息队列中,如果是CONNECT类型的消息,需要优先发送,所以放到了队首的位置pendingFlows.insertElementAt(message,0);// 唤醒发送线程进行消息发送。queueLock.notifyAll();}} else {if (message instanceof MqttPingReq) {// 如果消息是PING类型,将消息赋值给pingCommand用于后续心跳执行this.pingCommand = message;}// 如果消息是PubRel类型,则进入到下面处理else if (message instanceof MqttPubRel) {// 将消息加入到出站队列中,并且持久化起来outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);}else if (message instanceof MqttPubComp) {// 如果消息是PubComp类型,将持久化的数据清除,如果需要发送PubComp类型的// 消息,一般都是作为消费者的角色去发送的,而且消息类型是qos2// 所以发送出去之后,消息就算完全f发送成功了。persistence.remove(getReceivedPersistenceKey(message));}synchronized (queueLock) {if ( !(message instanceof MqttAck )) {tokenStore.saveToken(token, message);}// 最后统一添加到特殊消息队列中,同时唤醒发送线程。pendingFlows.addElement(message);queueLock.notifyAll();}}}}
这个方法可以说是在发送流程中是比较核心的,在这里需要注意一下几点
获取MessageId
private synchronized int getNextMessageId() throws MqttException {// 初始化MessageId,默认为0int startingMessageId = nextMsgId;int loopCount = 0;do {// 如果MessageId的最大值超过65535,则重置为1nextMsgId++;if ( nextMsgId > MAX_MSG_ID ) {nextMsgId = MIN_MSG_ID;}// 判断MessageId使用缓存inUseMsgIds中有使用记录,则继续循环,// 如果循环三次还未找到可用id,则抛出异常if (nextMsgId == startingMessageId) {loopCount++;if (loopCount == 2) {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_NO_MESSAGE_IDS_AVAILABLE);}}} while( inUseMsgIds.containsKey( Integer.valueOf(nextMsgId)) );// 将messageId放到id使用记录缓存表中Integer id = Integer.valueOf(nextMsgId);inUseMsgIds.put(id, id);return nextMsgId;}
消息入队
// 特殊消息处理队列pendingFlows = new Vector();// qos2类型消息的出站队列outboundQoS2 = new Hashtable();// qos1类型消息的出站队列outboundQoS1 = new Hashtable();// qos0类型消息的出站队列outboundQoS0 = new Hashtable();// 发送队列Vector pendingMessages
所以消息在最终发送出去之前,会根据消息类型和消息质量QOS的不同分发到不同的队列中,
首先所有Publish类型的消息都会添加到pendingMessages队列,所有非Publish类型的消息都会添加到pendingFlow队列,同时,若是Publish类型,qos1和qos2的消息会分别添加到outboundQos1和outboundQos2中被等待发送。
发送线程唤醒
那么为什么要将消息分发到队列中而不是直接发送出去呢?这里先提前解释一下,整个MQTTClient发送和接收操作的实现其实都是基于生产-消费模型来做的。在建立连接时,会维护一个常驻的发送线程,线程中会不断的从pendingFlow队列和pendingMessages队列中获取消息,如果获取不到,线程就暂时等待,当有消息加入到队列中时,会主动唤醒发送线程从队列中取出这条消息,然后发送出去。这样做的好处是可以充分解耦生产和发送流程,其内部并不是生产一条消息后直接调用发送方法,而是通过线程通信的方式,让生产和发送流程独立。
1.2 发送消息
发送线程的核心方法主要在CommsSender类中,核心代码如下:
public void run() {sendThread = Thread.currentThread();sendThread.setName(threadName);MqttWireMessage message = null;// 同步修改状态为正在执行synchronized (lifecycle) {current_state = State.RUNNING;}try {State my_target;synchronized (lifecycle) {my_target = target_state;}// 这里使用while循环来保证守护线程while (my_target == State.RUNNING && (out != null)) {try {// 这里非常关键,通过clientState的get()方法去获取队列中的数据// 如果获取不到,线程将会处于等待状态message = clientState.get();if (message != null) {// 如果获取到消息且是ACK类型,则直接发送出去if (message instanceof MqttAck) {out.write(message);out.flush();} else {// 如果是非ACK消息,则使用token加锁进行同步发送操作MqttToken token = message.getToken();if (token == null) {token = tokenStore.getToken(message);}if (token != null) {synchronized (token) {out.write(message);try {out.flush();} catch (IOException ex) {if (!(message instanceof MqttDisconnect)) {throw ex;}}// 发送操作完成后,通知回调进行后续处理clientState.notifySent(message);}}}} else { // null messagesynchronized (lifecycle) {target_state = State.STOPPED;}}} catch (MqttException me) {handleRunException(message, me);} catch (Exception ex) {handleRunException(message, ex);}synchronized (lifecycle) {my_target = target_state;}} // end while} finally {synchronized (lifecycle) {current_state = State.STOPPED;sendThread = null;}}}
我们先来看一下从队列中获取消息的代码,这段代码是通过clientState.get()方法获得的,代码如下:
protected MqttWireMessage get() throws MqttException {MqttWireMessage result = null;synchronized (queueLock) {while (result == null) {if ((pendingMessages.isEmpty() && pendingFlows.isEmpty()) ||(pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {try {// 如果发送队列pendingMessage和特殊消息队列pendingFlow都没有数据// 将线程设置为等待状态queueLock.wait();} catch (InterruptedException e) {}}if (pendingFlows == null || (!connected &&(pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect)))) {// pendingFlows为空或者(未连接且pengdingFlows中无数据或第一条数据不为MqttConnect// 返回nullreturn null;}if (!pendingFlows.isEmpty()) {// 如果不为空,获取特殊消息队列中的第一条数据,并且从队列中删除result = (MqttWireMessage)pendingFlows.remove(0);// 如果是MqttPubRel,增加计数if (result instanceof MqttPubRel) {inFlightPubRels++;}checkQuiesceLock();} else if (!pendingMessages.isEmpty()) {// 如果发送队列不为空且实际为着陆的消息数 < 最大飞行消息数if (actualInFlight < this.maxInflight) {// 从队列中获取第一条消息并从队列中删除result = (MqttWireMessage)pendingMessages.elementAt(0);pendingMessages.removeElementAt(0);// 增加飞行消息计数器actualInFlight++;} else {}}}}// 最后返回消息return result;}
这里要注意一下,特殊队列中的消息,即ACK类型的消息发送优先级是要高于Publish类型的消息的。
1.3发送后续处理
protected void notifySent(MqttWireMessage message) {this.lastOutboundActivity = System.nanoTime();MqttToken token = message.getToken();if (token == null) {token = tokenStore.getToken(message);if (token == null) return;}// 设置跟踪token的相关状态token.internalTok.notifySent();if (message instanceof MqttPingReq) {// 如果是PingReq类型的消息,心跳出站计数器+1synchronized (pingOutstandingLock) {long time = System.nanoTime();synchronized (pingOutstandingLock) {lastPing = time;pingOutstanding++;}}}else if (message instanceof MqttPublish) {// 如果是Publish类型额消息且QOS == 0if (((MqttPublish)message).getMessage().getQos() == 0) {// token完成标记token.internalTok.markComplete(null, null);// 操作完成时间回调callback.asyncOperationComplete(token);// 实际飞行消息计数 - 1decrementInFlight();// 释放MessageId,即从inUseMsgIds中移除使用记录releaseMessageId(message.getMessageId());// 移除tokentokenStore.removeToken(message);checkQuiesceLock();}}}
这里需要说明的是callback.asyncOperationComplete(token);
代码如下
public void asyncOperationComplete(MqttToken token) {// 这里会判断当前是否为运行状态,如果是运行状态,则直接在回调线程中处理if (isRunning()) {// 将token添加到完成队列中,同时唤醒回调线程处理completeQueue.addElement(token);synchronized (workAvailable) {workAvailable.notifyAll();}} else {// 若回调线程不在运行,则在调用者线程中处理try {handleActionComplete(token);} catch (Throwable ex) {clientComms.shutdownConnection(null, new MqttException(ex));}}}
以上就是发送消息的过程,下面我们来看消息是如果接收的~~
二、消息接收
相比较于消息发送来说,消息接收的代码更简单一些。与发送消息的实现一样,消息接收也是维护了一个守护线程,用来不断的从socket中读取数据。消息接收的具体实现在CommsReceiver的run()方法中,线程开启是在创建连接的后台线程里开启的。我们来看一下具体实现
public void run() {MqttToken token = null;synchronized (lifecycle) {current_state = State.RUNNING;}try {State my_target;synchronized (lifecycle) {my_target = target_state;}// while循环将线程常驻while (my_target == State.RUNNING && (in != null)) {try {if (in.available() > 0) {synchronized (lifecycle) {current_state = State.RECEIVING;}}// 从socket中获取数据MqttWireMessage message = in.readMqttWireMessage();synchronized (lifecycle) {current_state = State.RUNNING;}if (message instanceof MqttAck) {token = tokenStore.getToken(message);if (token!=null) {synchronized (token) {// 如果消息是ACK类型,则通知处理ACK类型消息clientState.notifyReceivedAck((MqttAck)message);}} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {} else {throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);}} else {if (message != null) {// 如果消息是非ACK类型,则通知处理正常消息clientState.notifyReceivedMsg(message);}}}catch (MqttException ex) {synchronized (lifecycle) {target_state = State.STOPPED;}clientComms.shutdownConnection(token, ex);}catch (IOException ioe) {synchronized (lifecycle) {target_state = State.STOPPED;}if (!clientComms.isDisconnecting()) {clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));}}finally {synchronized (lifecycle) {current_state = State.RUNNING;}}synchronized (lifecycle) {my_target = target_state;}} // end while} finally {synchronized (lifecycle) {current_state = State.STOPPED;}} // end tryrecThread = null;
这里有三点需要注意
关于从socket中读取消息
如果接收的是ACK类型的消息,进行ACK消息处理
这里我们具体看一下代码:
protected void notifyReceivedAck(MqttAck ack) throws MqttException {MqttToken token = tokenStore.getToken(ack);MqttException mex = null;if (token == null) {} else if (ack instanceof MqttPubRec) {// 如果ACK是PubRec类型,证明消息qos=2,此时需要会送一条Rel消息回执// 下面就是创建PubRel类型消息并发送的过程MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);this.send(rel, token);} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {// 如果ACK是MqttPubAck类型或者MqttPubComp,// 证明qos1或qos2的消息流程已经结束,之后通过下面的方法触发回调通知notifyResult(ack, token, mex);} else if (ack instanceof MqttPingResp) {// 如果ACK是MqttPingResp类型,心跳出站计数器 - 1,如果小于0,则取0synchronized (pingOutstandingLock) {pingOutstanding = Math.max(0, pingOutstanding-1);// 触发回调通知notifyResult(ack, token, mex);if (pingOutstanding == 0) {tokenStore.removeToken(ack);}}} else if (ack instanceof MqttConnack) {// 如果ACK是MqttConnack类型int rc = ((MqttConnack) ack).getReturnCode();if (rc == 0) {synchronized (queueLock) {// 如果连接成功且cleanSession设置诶true,清空本地所有队列和缓存if (cleanSession) {clearState();tokenStore.saveToken(token,ack);}// 重置PubRel类型且在飞行状态的消息计数器inFlightPubRels = 0;// 重置实际消息状态为飞行状态的计数器actualInFlight = 0;// 重新发送缓存中为着陆(即飞行状态)的消息restoreInflightMessages();// 开启心跳connected();}} else {mex = ExceptionHelper.createMqttException(rc);throw mex;}// 连接完成,修改状态clientComms.connectComplete((MqttConnack) ack, mex);// 通知回调notifyResult(ack, token, mex);tokenStore.removeToken(ack);synchronized (queueLock) {queueLock.notifyAll();}} else {// 如果其他类型的消息,通知回调,释放messageId等操作notifyResult(ack, token, mex);releaseMessageId(ack.getMessageId());tokenStore.removeToken(ack);}checkQuiesceLock();}
如果接收的是非ACK类型的消息,通知进行正常的消息处理
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {if (!quiescing) {if (message instanceof MqttPublish) {// 如果消息是Publish类型,判断qos并进行后续操作MqttPublish send = (MqttPublish) message;switch (send.getMessage().getQos()) {case 0:// 如果是qos0或qos1的消息,直接触发回调,在回调中会处理缓存等entitycase 1:if (callback != null) {callback.messageArrived(send);}break;case 2:// 如果是qos2,先进行持久化操作,再放到qos2的入站队列中,再发送一条// PubRec类型的消息persistence.put(getReceivedPersistenceKey(message),(MqttPublish) message);inboundQoS2.put( Integer.valueOf(send.getMessageId()), send);this.send(new MqttPubRec(send), null);break;default:}} else if (message instanceof MqttPubRel) {// 如果从inboundQoS2取到消息,通过回调线程处理发送PubComp,// 清空在inboundQoS2的这队列MqttPublish sendMsg = (MqttPublish) inboundQoS2.get( Integer.valueOf(message.getMessageId()));if (sendMsg != null) {if (callback != null) {callback.messageArrived(sendMsg);}} else {MqttPubComp pubComp = new MqttPubComp(message.getMessageId());this.send(pubComp, null);}}}}
注意,在上述方法中,会通知到回调线程处理,在回调线程CommsCallback.run()的handleActionComplete()方法中,会调用ClientState.notifyComplete()方法清除缓存,具体代码如下:
protected void notifyComplete(MqttToken token) throws MqttException {MqttWireMessage message = token.internalTok.getWireMessage();if (message != null && message instanceof MqttAck) {MqttAck ack = (MqttAck) message;if (ack instanceof MqttPubAck) {// 如果消息为MqttPubAck,则qos1的消息流程结束,删除这条信息对应// 的缓存数据persistence.remove(getSendPersistenceKey(message));persistence.remove(getSendBufferedPersistenceKey(message));outboundQoS1.remove(Integer.valueOf(ack.getMessageId()));// 将未着陆消息计数器 - 1decrementInFlight();// 释放messageIdreleaseMessageId(message.getMessageId());tokenStore.removeToken(message);} else if (ack instanceof MqttPubComp) {// 如果消息为MqttPubComp,则qos2的消息流程结束,删除这条信息对应// 的缓存数据persistence.remove(getSendPersistenceKey(message));persistence.remove(getSendConfirmPersistenceKey(message));persistence.remove(getSendBufferedPersistenceKey(message));outboundQoS2.remove( Integer.valueOf(ack.getMessageId()));// 将Rel类型未着陆消息计数器 - 1inFlightPubRels--;// 将未着陆消息计数器 - 1decrementInFlight();// 释放messageIdreleaseMessageId(message.getMessageId());tokenStore.removeToken(message);}checkQuiesceLock();}}
在CommsCallback.run()的handleMessage()方法中,会处理收到qos1和qos2消息后的回执发送操作,同时如果是qos2,会清空inboundQoS2队列中对应的Message,具体代码如下
private void handleMessage(MqttPublish publishMessage)throws MqttException, Exception {String destName = publishMessage.getTopicName();// 内部会清空inboundQoS2中对应的messagedeliverMessage(destName, publishMessage.getMessageId(),publishMessage.getMessage());// 如果是自动应答,发送对应的回执。if (!this.manualAcks) {if (publishMessage.getMessage().getQos() == 1) {this.clientComms.internalSend(new MqttPubAck(publishMessage),new MqttToken(clientComms.getClient().getClientId()));} else if (publishMessage.getMessage().getQos() == 2) {this.clientComms.deliveryComplete(publishMessage);MqttPubComp pubComp = new MqttPubComp(publishMessage);this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));}}}
总结
上述就是整个MQTTClient内部发送和接收消息的整个流程,在这里做个简单的总结
发送消息
1. 连接时会创建后台常驻的发送线程,发送线程会从对应的队列中获取数据,如果队列中没有待发送的数据,发送线程会置为等待状态。涉及到的队列如下:
pendingMessage: publish消息待发送队列
pendingFlow: 特殊消息待发送队列(以ACK为主),特殊消息优先级高于Publish消息
outboundQoS1:qos1类型的消息出站队列
outboundQoS2:qos2类型的消息出站队列
2. 将消息放到对应的队列后,唤醒发送线程
3.发送线程从消息队列中获取数据,将数据发送,同时删除发送队列(pendingFlow或pendingMessages)中对应的数据
4. 发送后通知回调,如果qos0的消息,释放MessageId,变更消息未着陆计数器,变更消息标识为完成状态,如果是PingReq类型的消息,自增心跳出站计数器
接收消息
1. 连接时会创建后台常驻的接收线程,接收线程会通过socket中阻塞式的获取数据。
2. 如果接收到ack类型的消息,判断不同的消息类型进行处理
PubRec类型:回执PubRel类型消息
PubACK或者PubComp类型:消息处理完成,回调通知并在ClientStat的notifyComplete()方法,根据qos从发送队列和出站队列中移除该消息
PingResp类型:心跳出站计数器 - 1,回调通知
ConneACK类型:重置消息未着陆计数器和PubRel消息未着陆计数器,发起心跳,重发未着陆消息。
3. 如果接收到非ack类型的消息,判断不同的消息类型进行处理
Publish类型:
qos0或qos1: 执行回调CommsCallback中的handleMessage()方法,将回执发送出去
qos2: 将消息持久化,并放到入站消息队列inboundQoS2中,同时发送PubRec消息
PubRel类型:
inboundQoS2中存在该message,将调用CommsCallback的handleMessage发送PubComp,同时清除inboundQoS2中的该条Message缓存
若不存在,直接发送PubComp最终会通过CommeCallback的handleMessage方法中的deliverMessage()方法回调到我们的消息处理器中。
以上就是MQTTClient作为消息发送和接收的源码分析,目前我们还遗留了几个问题
Client与Broker的心跳探活机制
Client与Broker连接断开后的重连机制以及重连时对应的未着陆消息重发策略
这些内容将会在下一个章节给大家介绍,敬请期待吧~






