暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收

糖爸的架构师之路 2021-06-24
2013

写在前面

        通过上一个章节MQTT之Eclipse.Paho源码(一)--建立连接的介绍,我们已经将客户端和服务端建立了连接,本章我们就来实际看一下Eclipse.Paho是如何帮助我们进行生产和消费消息的。


消息发送

1.1构建消息并入队

我们还是通过源代码入手,在上一章节中,我们提到了MqttPahoMessageHandler这个消息处理器,它为发送消息提供了统一的出口。通过AbstractMessageHandler分发到不同的出站适配器中,所以我们这里还是要通过MqttPahoMessageHandler的publish方法的实现作为入手点。在上一章中,我们提到了一个publish方法实现中一个非常重要的操作,代码如下
    IMqttDeliveryToken token = checkConnection().publish(topic, (MqttMssage) mqttMessage);
    在checkConnection中,我们已经建立好了连接,那么接下来我们就需要将消息发送出去。对于Eclipse.Paho来说,发送消息的入口在ClientState.send()方法中,但是进入send()方法前,针对Publish控制报文类型的消息,会做一些前置的缓存操作.代码如下:
        public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
      final String methodName = "sendNoWait";
      // 判断当前的连接状态,如果连接正常或者未连接,但是控制报文消息是Connect类型,或者
      // 正在断开连接且报文是DISCONNECT类型则进入条件语句中处理
      if (isConnected() ||
      (!isConnected() && message instanceof MqttConnect) ||
      (isDisconnecting() && message instanceof MqttDisconnect)) {
      // 如果disconnect缓冲区被打开且缓冲区中有消息,同步到缓冲区红,如果持久化策略被打开,同时做持久化操作
      if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){


      if(disconnectedMessageBuffer.isPersistBuffer()){
      this.clientState.persistBufferedMessage(message);
      }
      disconnectedMessageBuffer.putMessage(message, token);
      } else {
      // 若disconnectedMessageBuffer未打开,则证明连接正常,直接发送消息
      this.internalSend(message, token);
      }
      } else if(disconnectedMessageBuffer != null) {


      if(disconnectedMessageBuffer.isPersistBuffer()){
      this.clientState.persistBufferedMessage(message);
      }
      disconnectedMessageBuffer.putMessage(message, token);
      } else {


      throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
      }
      }
      上面的操作其实就是如果连接状态有异常,会将消息缓存起来,等待之后连接恢复时重新将这些消息发送出去。

      下面来看一下发送的入口方法,也就是ClientState.send()里面的相关代码。

        public void send(MqttWireMessage message, MqttToken token) throws MqttException {
        // 为发送的消息设置MessageId,如果qos=0,则不需要进行messageId的设置
        if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
        if(message instanceof MqttPublish && (((MqttPublish) message).getMessage().getQos() != 0)){
        message.setMessageId(getNextMessageId());
        }else if(message instanceof MqttPubAck ||
        message instanceof MqttPubRec ||
        message instanceof MqttPubRel ||
        message instanceof MqttPubComp ||
        message instanceof MqttSubscribe ||
        message instanceof MqttSuback ||
        message instanceof MqttUnsubscribe ||
        message instanceof MqttUnsubAck){
        message.setMessageId(getNextMessageId());
        }
        }
        // 将message和token绑定 ,并且设置token的MessageId
        if (token != null) {
        message.setToken(token);
        try {
        token.internalTok.setMessageID(message.getMessageId());
        } catch (Exception e) {
        }
        }
        // 根据消息的qos,将消息分发到不同的队列中
        if (message instanceof MqttPublish) { // 只有Publish类型的消息才需要进队列发送。
        synchronized (queueLock) {
        // 这里会对正在飞行的消息(消息发送出去但是还未收到ack,即未完成的消息)
        // 做一个限制,如果实际为着陆的消息>=设置的上线,则被拒绝并抛出异常
        if (actualInFlight >= this.maxInflight)
        throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
        }
        MqttMessage innerMessage = ((MqttPublish) message).getMessage();
        switch(innerMessage.getQos()) {
        case 2:
        outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
        tokenStore.saveToken(token, message);
        break;
        case 1:
        outboundQoS1.put( Integer.valueOf(message.getMessageId()), message);
        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
        tokenStore.saveToken(token, message);
        break;
        }
        // 最后统一添加到待发送队列中
        pendingMessages.addElement(message);
        // 唤醒发送线程进行消息发送。
        queueLock.notifyAll();
        }
        } else {
        if (message instanceof MqttConnect) {// 如果消息是CONNECT类型,则进入到下面处理
        synchronized (queueLock) {
        tokenStore.saveToken(token, message);
        // 添加到特殊消息队列中,如果是CONNECT类型的消息,需要优先发送,所以放到了队首的位置
        pendingFlows.insertElementAt(message,0);
        // 唤醒发送线程进行消息发送。
        queueLock.notifyAll();
        }
        } else {
        if (message instanceof MqttPingReq) {
        // 如果消息是PING类型,将消息赋值给pingCommand用于后续心跳执行
        this.pingCommand = message;
        }
        // 如果消息是PubRel类型,则进入到下面处理
        else if (message instanceof MqttPubRel) {
        // 将消息加入到出站队列中,并且持久化起来
        outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
        persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
        }
        else if (message instanceof MqttPubComp) {
        // 如果消息是PubComp类型,将持久化的数据清除,如果需要发送PubComp类型的
        // 消息,一般都是作为消费者的角色去发送的,而且消息类型是qos2
        // 所以发送出去之后,消息就算完全f发送成功了。
        persistence.remove(getReceivedPersistenceKey(message));
        }
        synchronized (queueLock) {
        if ( !(message instanceof MqttAck )) {
        tokenStore.saveToken(token, message);
        }
        // 最后统一添加到特殊消息队列中,同时唤醒发送线程。
        pendingFlows.addElement(message);
        queueLock.notifyAll();
        }
        }
        }
        }

        这个方法可以说是在发送流程中是比较核心的,在这里需要注意一下几点

        • 获取MessageId

        MessageId代表消息的唯一标识,在处理QOS1或者QOS2类型的消息时是需要和服务器发送一条或者多条回执的。当消息的发送到Broker后收到Broker回执或在消息被消费后向Broker发送回执时都需要携带MessageId,用来查找和匹配消息。具体生成MessageId的逻辑如下
           private synchronized int getNextMessageId() throws MqttException {
          // 初始化MessageId,默认为0
          int startingMessageId = nextMsgId;
          int loopCount = 0;
          do {
          // 如果MessageId的最大值超过65535,则重置为1
          nextMsgId++;
          if ( nextMsgId > MAX_MSG_ID ) {
          nextMsgId = MIN_MSG_ID;
          }
          // 判断MessageId使用缓存inUseMsgIds中有使用记录,则继续循环,
          // 如果循环三次还未找到可用id,则抛出异常
          if (nextMsgId == startingMessageId) {
          loopCount++;
          if (loopCount == 2) {
          throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_NO_MESSAGE_IDS_AVAILABLE);
          }
          }
          } while( inUseMsgIds.containsKey( Integer.valueOf(nextMsgId)) );
          // 将messageId放到id使用记录缓存表中
          Integer id = Integer.valueOf(nextMsgId);
          inUseMsgIds.put(id, id);
          return nextMsgId;
          }


          • 消息入队

          上一章我们介绍过,Eclipse.Paho发送和接收消息都是要先入队列的,涉及到的发送队列定义在ClientState中,如下所示:
                // 特殊消息处理队列
            pendingFlows = new Vector();
            // qos2类型消息的出站队列
            outboundQoS2 = new Hashtable();
            // qos1类型消息的出站队列
            outboundQoS1 = new Hashtable();
            // qos0类型消息的出站队列
            outboundQoS0 = new Hashtable();
            // 发送队列
            Vector pendingMessages

            所以消息在最终发送出去之前,会根据消息类型和消息质量QOS的不同分发到不同的队列中

                     首先所有Publish类型的消息都会添加到pendingMessages队列,所有非Publish类型的消息都会添加到pendingFlow队列,同时,若是Publish类型,qos1和qos2的消息会分别添加到outboundQos1和outboundQos2中被等待发送。


            • 发送线程唤醒

                     那么为什么要将消息分发到队列中而不是直接发送出去呢?这里先提前解释一下,整个MQTTClient发送和接收操作的实现其实都是基于生产-消费模型来做的。在建立连接时,会维护一个常驻的发送线程,线程中会不断的从pendingFlow队列和pendingMessages队列中获取消息,如果获取不到,线程就暂时等待,当有消息加入到队列中时,会主动唤醒发送线程从队列中取出这条消息,然后发送出去。这样做的好处是可以充分解耦生产和发送流程,其内部并不是生产一条消息后直接调用发送方法,而是通过线程通信的方式,让生产和发送流程独立。


            1.2 发送消息

            发送线程的核心方法主要在CommsSender类中,核心代码如下:

              public void run() {
              sendThread = Thread.currentThread();
              sendThread.setName(threadName);
              MqttWireMessage message = null;
              // 同步修改状态为正在执行
              synchronized (lifecycle) {
              current_state = State.RUNNING;
              }
              try {
              State my_target;
              synchronized (lifecycle) {
              my_target = target_state;
              }
              // 这里使用while循环来保证守护线程
              while (my_target == State.RUNNING && (out != null)) {
              try {
              // 这里非常关键,通过clientState的get()方法去获取队列中的数据
              // 如果获取不到,线程将会处于等待状态
              message = clientState.get();
              if (message != null) {
              // 如果获取到消息且是ACK类型,则直接发送出去
              if (message instanceof MqttAck) {
              out.write(message);
              out.flush();
              } else {
              // 如果是非ACK消息,则使用token加锁进行同步发送操作
              MqttToken token = message.getToken();
              if (token == null) {
              token = tokenStore.getToken(message);
              }
              if (token != null) {
              synchronized (token) {
              out.write(message);
              try {
              out.flush();
              } catch (IOException ex) {
              if (!(message instanceof MqttDisconnect)) {
              throw ex;
              }
              }
              // 发送操作完成后,通知回调进行后续处理
              clientState.notifySent(message);
              }
              }
              }
              } else { // null message
              synchronized (lifecycle) {
              target_state = State.STOPPED;
              }
              }
              } catch (MqttException me) {
              handleRunException(message, me);
              } catch (Exception ex) {
              handleRunException(message, ex);
              }
              synchronized (lifecycle) {
              my_target = target_state;
              }
              } // end while
              } finally {
              synchronized (lifecycle) {
              current_state = State.STOPPED;
              sendThread = null;
              }
              }
              }

              我们先来看一下从队列中获取消息的代码,这段代码是通过clientState.get()方法获得的,代码如下:

                protected MqttWireMessage get() throws MqttException {
                MqttWireMessage result = null;
                synchronized (queueLock) {
                while (result == null) {
                if ((pendingMessages.isEmpty() && pendingFlows.isEmpty()) ||
                (pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {
                try {
                // 如果发送队列pendingMessage和特殊消息队列pendingFlow都没有数据
                // 将线程设置为等待状态
                queueLock.wait();
                } catch (InterruptedException e) {
                }
                }
                if (pendingFlows == null || (!connected &&
                (pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect)))) {
                // pendingFlows为空或者(未连接且pengdingFlows中无数据或第一条数据不为MqttConnect
                // 返回null
                return null;
                }
                if (!pendingFlows.isEmpty()) {
                // 如果不为空,获取特殊消息队列中的第一条数据,并且从队列中删除


                result = (MqttWireMessage)pendingFlows.remove(0);
                // 如果是MqttPubRel,增加计数
                if (result instanceof MqttPubRel) {
                inFlightPubRels++;


                }
                checkQuiesceLock();
                } else if (!pendingMessages.isEmpty()) {
                // 如果发送队列不为空且实际为着陆的消息数 < 最大飞行消息数
                if (actualInFlight < this.maxInflight) {
                // 从队列中获取第一条消息并从队列中删除
                result = (MqttWireMessage)pendingMessages.elementAt(0);
                pendingMessages.removeElementAt(0);
                // 增加飞行消息计数器
                actualInFlight++;
                } else {
                }
                }
                }
                }
                // 最后返回消息
                return result;
                }

                这里要注意一下,特殊队列中的消息,即ACK类型的消息发送优先级是要高于Publish类型的消息的。


                1.3发送后续处理

                  protected void notifySent(MqttWireMessage message) {
                  this.lastOutboundActivity = System.nanoTime();
                  MqttToken token = message.getToken();
                  if (token == null) {
                  token = tokenStore.getToken(message);
                  if (token == null) return;
                  }
                  // 设置跟踪token的相关状态
                  token.internalTok.notifySent();
                  if (message instanceof MqttPingReq) {
                  // 如果是PingReq类型的消息,心跳出站计数器+1
                  synchronized (pingOutstandingLock) {
                  long time = System.nanoTime();
                  synchronized (pingOutstandingLock) {
                  lastPing = time;
                  pingOutstanding++;
                  }


                  }
                  }
                  else if (message instanceof MqttPublish) {
                  // 如果是Publish类型额消息且QOS == 0
                  if (((MqttPublish)message).getMessage().getQos() == 0) {
                  // token完成标记
                  token.internalTok.markComplete(null, null);
                  // 操作完成时间回调
                  callback.asyncOperationComplete(token);
                  // 实际飞行消息计数 - 1
                  decrementInFlight();
                  // 释放MessageId,即从inUseMsgIds中移除使用记录
                  releaseMessageId(message.getMessageId());
                  // 移除token
                  tokenStore.removeToken(message);
                  checkQuiesceLock();
                  }
                  }
                  }

                  这里需要说明的是callback.asyncOperationComplete(token);

                  代码如下

                    public void asyncOperationComplete(MqttToken token) {
                    // 这里会判断当前是否为运行状态,如果是运行状态,则直接在回调线程中处理
                    if (isRunning()) {
                    // 将token添加到完成队列中,同时唤醒回调线程处理
                    completeQueue.addElement(token);
                    synchronized (workAvailable) {
                    workAvailable.notifyAll();
                    }
                    } else {
                    // 若回调线程不在运行,则在调用者线程中处理
                    try {
                    handleActionComplete(token);
                    } catch (Throwable ex) {
                    clientComms.shutdownConnection(null, new MqttException(ex));
                    }
                    }
                    }
                    handleActionComplete(token);方法中会触发deliveryComplete()方法回调,如果开启了事件回调,会发送对应的事件MqttMessageDeliveredEvent,我们只要监听这个事件,就可以收到消息发送完成的事件通知。

                    以上就是发送消息的过程,下面我们来看消息是如果接收的~~


                    二、消息接收

                    相比较于消息发送来说,消息接收的代码更简单一些。与发送消息的实现一样,消息接收也是维护了一个守护线程,用来不断的从socket中读取数据。消息接收的具体实现在CommsReceiver的run()方法中,线程开启是在创建连接的后台线程里开启的。我们来看一下具体实现

                      public void run() {
                      MqttToken token = null;
                      synchronized (lifecycle) {
                      current_state = State.RUNNING;
                      }
                      try {
                      State my_target;
                      synchronized (lifecycle) {
                      my_target = target_state;
                      }
                      // while循环将线程常驻
                      while (my_target == State.RUNNING && (in != null)) {
                      try {
                      if (in.available() > 0) {
                      synchronized (lifecycle) {
                      current_state = State.RECEIVING;
                      }
                      }
                      // 从socket中获取数据
                      MqttWireMessage message = in.readMqttWireMessage();
                      synchronized (lifecycle) {
                      current_state = State.RUNNING;
                      }
                      if (message instanceof MqttAck) {
                      token = tokenStore.getToken(message);
                      if (token!=null) {
                      synchronized (token) {
                      // 如果消息是ACK类型,则通知处理ACK类型消息
                      clientState.notifyReceivedAck((MqttAck)message);
                      }
                      } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
                      } else {
                      throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                      }
                      } else {
                      if (message != null) {
                      // 如果消息是非ACK类型,则通知处理正常消息
                      clientState.notifyReceivedMsg(message);
                      }
                      }
                      }
                      catch (MqttException ex) {
                      synchronized (lifecycle) {
                      target_state = State.STOPPED;
                      }


                      clientComms.shutdownConnection(token, ex);
                      }
                      catch (IOException ioe) {
                      synchronized (lifecycle) {
                      target_state = State.STOPPED;
                      }
                      if (!clientComms.isDisconnecting()) {
                      clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                      }
                      }
                      finally {
                      synchronized (lifecycle) {
                      current_state = State.RUNNING;
                      }
                      }
                      synchronized (lifecycle) {
                      my_target = target_state;
                      }
                      } // end while
                      } finally {
                      synchronized (lifecycle) {
                      current_state = State.STOPPED;
                      }
                      } // end try
                      recThread = null;

                      这里有三点需要注意


                      • 关于从socket中读取消息

                      因为socket.getInputStream()一直在阻塞,如果没有消息是读不到message的,因此在这里的while循环也没有无限制 的运行下去,只有在有消息的时候才往下走。socket默认是阻塞的,就是在读的时候如果读不到资源就会一直等待,直到超时(如果设置了超时时间的话)。 如果服务端和客户端都在读的话而没有写的话就会一直阻塞。我们可以使用SocketChannel,设置socket的通道,使其变成非阻塞的。


                      • 如果接收的是ACK类型的消息,进行ACK消息处理

                      这里我们具体看一下代码:

                        protected void notifyReceivedAck(MqttAck ack) throws MqttException {


                        MqttToken token = tokenStore.getToken(ack);
                        MqttException mex = null;


                        if (token == null) {
                        } else if (ack instanceof MqttPubRec) {
                        // 如果ACK是PubRec类型,证明消息qos=2,此时需要会送一条Rel消息回执
                        // 下面就是创建PubRel类型消息并发送的过程
                        MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
                        this.send(rel, token);
                        } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
                        // 如果ACK是MqttPubAck类型或者MqttPubComp,
                        // 证明qos1或qos2的消息流程已经结束,之后通过下面的方法触发回调通知
                        notifyResult(ack, token, mex);
                        } else if (ack instanceof MqttPingResp) {
                        // 如果ACK是MqttPingResp类型,心跳出站计数器 - 1,如果小于0,则取0
                        synchronized (pingOutstandingLock) {
                        pingOutstanding = Math.max(0, pingOutstanding-1);
                        // 触发回调通知
                        notifyResult(ack, token, mex);
                        if (pingOutstanding == 0) {
                        tokenStore.removeToken(ack);
                        }
                        }
                        } else if (ack instanceof MqttConnack) {
                        // 如果ACK是MqttConnack类型
                        int rc = ((MqttConnack) ack).getReturnCode();
                        if (rc == 0) {
                        synchronized (queueLock) {
                        // 如果连接成功且cleanSession设置诶true,清空本地所有队列和缓存
                        if (cleanSession) {
                        clearState();
                        tokenStore.saveToken(token,ack);
                        }
                        // 重置PubRel类型且在飞行状态的消息计数器
                        inFlightPubRels = 0;
                        // 重置实际消息状态为飞行状态的计数器
                        actualInFlight = 0;
                        // 重新发送缓存中为着陆(即飞行状态)的消息
                        restoreInflightMessages();
                        // 开启心跳
                        connected();
                        }
                        } else {
                        mex = ExceptionHelper.createMqttException(rc);
                        throw mex;
                        }
                        // 连接完成,修改状态
                        clientComms.connectComplete((MqttConnack) ack, mex);
                        // 通知回调
                        notifyResult(ack, token, mex);
                        tokenStore.removeToken(ack);
                        synchronized (queueLock) {
                        queueLock.notifyAll();
                        }
                        } else {
                        // 如果其他类型的消息,通知回调,释放messageId等操作
                        notifyResult(ack, token, mex);
                        releaseMessageId(ack.getMessageId());
                        tokenStore.removeToken(ack);
                        }
                        checkQuiesceLock();
                        }


                        • 如果接收的是非ACK类型的消息,通知进行正常的消息处理

                          protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
                          if (!quiescing) {
                          if (message instanceof MqttPublish) {
                          // 如果消息是Publish类型,判断qos并进行后续操作
                          MqttPublish send = (MqttPublish) message;
                          switch (send.getMessage().getQos()) {
                          case 0:
                          // 如果是qos0或qos1的消息,直接触发回调,在回调中会处理缓存等entity
                          case 1:
                          if (callback != null) {
                          callback.messageArrived(send);
                          }
                          break;
                          case 2:
                          // 如果是qos2,先进行持久化操作,再放到qos2的入站队列中,再发送一条
                          // PubRec类型的消息
                          persistence.put(getReceivedPersistenceKey(message),
                          (MqttPublish) message);
                          inboundQoS2.put( Integer.valueOf(send.getMessageId()), send);
                          this.send(new MqttPubRec(send), null);
                          break;
                          default:
                          }
                          } else if (message instanceof MqttPubRel) {
                          // 如果从inboundQoS2取到消息,通过回调线程处理发送PubComp,
                          // 清空在inboundQoS2的这队列
                          MqttPublish sendMsg = (MqttPublish) inboundQoS2
                          .get( Integer.valueOf(message.getMessageId()));
                          if (sendMsg != null) {
                          if (callback != null) {
                          callback.messageArrived(sendMsg);
                          }
                          } else {
                          MqttPubComp pubComp = new MqttPubComp(message
                          .getMessageId());
                          this.send(pubComp, null);
                          }
                          }
                          }
                          }

                          注意,在上述方法中,会通知到回调线程处理,在回调线程CommsCallback.run()的handleActionComplete()方法中,会调用ClientState.notifyComplete()方法清除缓存,具体代码如下:

                            protected void notifyComplete(MqttToken token) throws MqttException {

                            MqttWireMessage message = token.internalTok.getWireMessage();


                            if (message != null && message instanceof MqttAck) {
                            MqttAck ack = (MqttAck) message;
                            if (ack instanceof MqttPubAck) {
                            // 如果消息为MqttPubAck,则qos1的消息流程结束,删除这条信息对应
                            // 的缓存数据


                            persistence.remove(getSendPersistenceKey(message));
                            persistence.remove(getSendBufferedPersistenceKey(message));
                            outboundQoS1.remove(Integer.valueOf(ack.getMessageId()));
                            // 将未着陆消息计数器 - 1
                            decrementInFlight();
                            // 释放messageId
                            releaseMessageId(message.getMessageId());
                            tokenStore.removeToken(message);
                            } else if (ack instanceof MqttPubComp) {
                            // 如果消息为MqttPubComp,则qos2的消息流程结束,删除这条信息对应
                            // 的缓存数据
                            persistence.remove(getSendPersistenceKey(message));
                            persistence.remove(getSendConfirmPersistenceKey(message));
                            persistence.remove(getSendBufferedPersistenceKey(message));
                            outboundQoS2.remove( Integer.valueOf(ack.getMessageId()));
                            // 将Rel类型未着陆消息计数器 - 1
                            inFlightPubRels--;
                            // 将未着陆消息计数器 - 1
                            decrementInFlight();
                            // 释放messageId
                            releaseMessageId(message.getMessageId());
                            tokenStore.removeToken(message);
                            }
                            checkQuiesceLock();
                            }
                            }

                            在CommsCallback.run()的handleMessage()方法中,会处理收到qos1和qos2消息后的回执发送操作,同时如果是qos2,会清空inboundQoS2队列中对应的Message,具体代码如下

                                private void handleMessage(MqttPublish publishMessage)
                                    throws MqttException, Exception {
                                  String destName = publishMessage.getTopicName();
                                  // 内部会清空inboundQoS2中对应的message
                                  deliverMessage(destName, publishMessage.getMessageId(),publishMessage.getMessage());
                                  // 如果是自动应答,发送对应的回执。
                              if (!this.manualAcks) {
                              if (publishMessage.getMessage().getQos() == 1) {
                              this.clientComms.internalSend(new MqttPubAck(publishMessage),
                              new MqttToken(clientComms.getClient().getClientId()));
                              } else if (publishMessage.getMessage().getQos() == 2) {
                              this.clientComms.deliveryComplete(publishMessage);
                              MqttPubComp pubComp = new MqttPubComp(publishMessage);
                              this.clientComms.internalSend(pubComp, new MqttToken(
                              clientComms.getClient().getClientId()));
                              }
                              }
                              }


                              总结

                              上述就是整个MQTTClient内部发送和接收消息的整个流程,在这里做个简单的总结


                              发送消息

                              1. 连接时会创建后台常驻的发送线程,发送线程会从对应的队列中获取数据,如果队列中没有待发送的数据,发送线程会置为等待状态。涉及到的队列如下:

                              • pendingMessage: publish消息待发送队列

                              • pendingFlow: 特殊消息待发送队列(以ACK为主),特殊消息优先级高于Publish消息

                              • outboundQoS1:qos1类型的消息出站队列

                              • outboundQoS2:qos2类型的消息出站队列


                              2. 将消息放到对应的队列后,唤醒发送线程


                              3.发送线程从消息队列中获取数据,将数据发送,同时删除发送队列(pendingFlow或pendingMessages)中对应的数据


                              4. 发送后通知回调,如果qos0的消息,释放MessageId,变更消息未着陆计数器,变更消息标识为完成状态,如果是PingReq类型的消息,自增心跳出站计数器


                              接收消息


                              1.  连接时会创建后台常驻的接收线程,接收线程会通过socket中阻塞式的获取数据。


                              2.  如果接收到ack类型的消息,判断不同的消息类型进行处理

                              • PubRec类型:回执PubRel类型消息

                              • PubACK或者PubComp类型:消息处理完成,回调通知并在ClientStat的notifyComplete()方法,根据qos从发送队列和出站队列中移除该消息

                              • PingResp类型:心跳出站计数器 - 1,回调通知

                              • ConneACK类型:重置消息未着陆计数器和PubRel消息未着陆计数器,发起心跳,重发未着陆消息。

                              3.  如果接收到非ack类型的消息,判断不同的消息类型进行处理

                              Publish类型:

                              • qos0或qos1: 执行回调CommsCallback中的handleMessage()方法,将回执发送出去

                              • qos2: 将消息持久化,并放到入站消息队列inboundQoS2中,同时发送PubRec消息

                              PubRel类型:

                              • inboundQoS2中存在该message,将调用CommsCallback的handleMessage发送PubComp,同时清除inboundQoS2中的该条Message缓存

                              • 若不存在,直接发送PubComp最终会通过CommeCallback的handleMessage方法中的deliverMessage()方法回调到我们的消息处理器中。



                              以上就是MQTTClient作为消息发送和接收的源码分析,目前我们还遗留了几个问题

                              • Client与Broker的心跳探活机制

                              • Client与Broker连接断开后的重连机制以及重连时对应的未着陆消息重发策略

                              这些内容将会在下一个章节给大家介绍,敬请期待吧~

                              文章转载自糖爸的架构师之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论