暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ源码分析之Broker处理拉取消息请求(1)

徘徊笔记 2019-05-01
1019

来源:https://github.com/apache/rocketmq


broker处理拉取消息请求,第一次brokerAllowSuspend为true。

    public RemotingCommand processRequest(final ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    return this.processRequest(ctx.channel(), request, true);
    }


    判断当前broker是都可读,是否由消费组的订阅信息,是否可被消费,

      if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
      response.setCode(ResponseCode.NO_PERMISSION);
      response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
      return response;
      }


      SubscriptionGroupConfig subscriptionGroupConfig =
      this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
      if (null == subscriptionGroupConfig) {
      response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
      response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
      return response;
      }


      if (!subscriptionGroupConfig.isConsumeEnable()) {
      response.setCode(ResponseCode.NO_PERMISSION);
      response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
      return response;
      }


      获取各个标志位,是否可暂停,是否提交偏移量,是否重新订阅,暂停时间

        final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
        final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
        final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());


        final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;


        获取topic的信息,topic是否存在,是否可读,请求读取的队列id是否在可读队列的区间内,队列id从0开始。

          TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
          if (null == topicConfig) {
          log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
          response.setCode(ResponseCode.TOPIC_NOT_EXIST);
          response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
          return response;
          }


          if (!PermName.isReadable(topicConfig.getPerm())) {
          response.setCode(ResponseCode.NO_PERMISSION);
          response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
          return response;
          }


          if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
          String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
          requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
          log.warn(errorInfo);
          response.setCode(ResponseCode.SYSTEM_ERROR);
          response.setRemark(errorInfo);
          return response;
          }


          检查是否有消费组信息以及订阅等信息是否合法。下面的大部分校验的针对订阅的过滤表达式时非tag时的情况。

            SubscriptionData subscriptionData = null;
            ConsumerFilterData consumerFilterData = null;
            if (hasSubscriptionFlag) {
            try {
            subscriptionData = FilterAPI.build(
            requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
            );
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
            consumerFilterData = ConsumerFilterManager.build(
            requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
            requestHeader.getExpressionType(), requestHeader.getSubVersion()
            );
            assert consumerFilterData != null;
            }
            } catch (Exception e) {
            log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
            requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
            response.setRemark("parse the consumer's subscription failed");
            return response;
            }
            } else {
            ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
            if (null == consumerGroupInfo) {
            log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
            }


            if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
            && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
            return response;
            }


            subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
            if (null == subscriptionData) {
            log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
            }


            if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
            log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
            subscriptionData.getSubString());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
            response.setRemark("the consumer's subscription not latest");
            return response;
            }
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
            consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
            requestHeader.getConsumerGroup());
            if (consumerFilterData == null) {
            response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
            response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
            return response;
            }
            if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
            log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
            response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
            response.setRemark("the consumer's consumer filter data not latest");
            return response;
            }
            }
            }


            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
            && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
            return response;
            }


            消息过滤类默认为ExpressionMessageFilter

              // whether do filter when retry.
              private boolean filterSupportRetry = false;


              MessageFilter messageFilter;
              if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
              messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
              this.brokerController.getConsumerFilterManager());
              } else {
              messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
              this.brokerController.getConsumerFilterManager());
              }


              获取消息

                final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);


                判断当前状态是否可读正常,获取最大的消息偏移量,查找对应的消息消费队列

                  public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
                  final int maxMsgNums,
                  final MessageFilter messageFilter) {
                  if (this.shutdown) {
                  log.warn("message store has shutdown, so getMessage is forbidden");
                  return null;
                  }


                  if (!this.runningFlags.isReadable()) {
                  log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
                  return null;
                  }


                  long beginTime = this.getSystemClock().now();


                  GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                  long nextBeginOffset = offset;
                  long minOffset = 0;
                  long maxOffset = 0;


                  GetMessageResult getResult = new GetMessageResult();


                  final long maxOffsetPy = this.commitLog.getMaxOffset();


                  ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
                  if (consumeQueue != null) {
                  minOffset = consumeQueue.getMinOffsetInQueue();
                  maxOffset = consumeQueue.getMaxOffsetInQueue();


                  if (maxOffset == 0) {
                  status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                  nextBeginOffset = nextOffsetCorrection(offset, 0);
                  } else if (offset < minOffset) {
                  status = GetMessageStatus.OFFSET_TOO_SMALL;
                  nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                  } else if (offset == maxOffset) {
                  status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                  nextBeginOffset = nextOffsetCorrection(offset, offset);
                  } else if (offset > maxOffset) {
                  status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                  if (0 == minOffset) {
                  nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                  } else {
                  nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                  }
                  } else {
                  SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                  if (bufferConsumeQueue != null) {
                  try {
                  status = GetMessageStatus.NO_MATCHED_MESSAGE;


                  long nextPhyFileStartOffset = Long.MIN_VALUE;
                  long maxPhyOffsetPulling = 0;


                  int i = 0;
                  final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                  final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                  ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                  for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                  long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                  int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                  long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();


                  maxPhyOffsetPulling = offsetPy;


                  if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                  if (offsetPy < nextPhyFileStartOffset)
                  continue;
                  }


                  boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);


                  if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                  isInDisk)) {
                  break;
                  }


                  boolean extRet = false, isTagsCodeLegal = true;
                  if (consumeQueue.isExtAddr(tagsCode)) {
                  extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                  if (extRet) {
                  tagsCode = cqExtUnit.getTagsCode();
                  } else {
                  // can't find ext content.Client will filter messages by tag also.
                  log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                  tagsCode, offsetPy, sizePy, topic, group);
                  isTagsCodeLegal = false;
                  }
                  }


                  if (messageFilter != null
                  && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                  if (getResult.getBufferTotalSize() == 0) {
                  status = GetMessageStatus.NO_MATCHED_MESSAGE;
                  }


                  continue;
                  }


                  SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                  if (null == selectResult) {
                  if (getResult.getBufferTotalSize() == 0) {
                  status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                  }


                  nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                  continue;
                  }


                  if (messageFilter != null
                  && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                  if (getResult.getBufferTotalSize() == 0) {
                  status = GetMessageStatus.NO_MATCHED_MESSAGE;
                  }
                  // release...
                  selectResult.release();
                  continue;
                  }


                  this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                  getResult.addMessage(selectResult);
                  status = GetMessageStatus.FOUND;
                  nextPhyFileStartOffset = Long.MIN_VALUE;
                  }


                  if (diskFallRecorded) {
                  long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                  brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                  }


                  nextBeginOffset = offset + (i ConsumeQueue.CQ_STORE_UNIT_SIZE);


                  long diff = maxOffsetPy - maxPhyOffsetPulling;
                  long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                  * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() 100.0));
                  getResult.setSuggestPullingFromSlave(diff > memory);
                  } finally {


                  bufferConsumeQueue.release();
                  }
                  } else {
                  status = GetMessageStatus.OFFSET_FOUND_NULL;
                  nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                  log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                  + maxOffset + ", but access logic queue failed.");
                  }
                  }
                  } else {
                  status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
                  nextBeginOffset = nextOffsetCorrection(offset, 0);
                  }


                  if (GetMessageStatus.FOUND == status) {
                  this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
                  } else {
                  this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
                  }
                  long eclipseTime = this.getSystemClock().now() - beginTime;
                  this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);


                  getResult.setStatus(status);
                  getResult.setNextBeginOffset(nextBeginOffset);
                  getResult.setMaxOffset(maxOffset);
                  getResult.setMinOffset(minOffset);
                  return getResult;
                  }


                  获取队列的最小最大下标与请求的下标进行比较,当maxOffset == 0代表没有消息产生,设置下一个拉取下标为0,当offset < minOffset说明下标太小,重置为minOffset,当offset == maxOffset说明超过最大下标一位,重置为offset ,当offset > maxOffset说明下标超过最大下标太多,说明是非法超过主broker的最大下标,重置为0或者maxOffset,当本broker为从的话,前面的所有都重置为offset

                    默认是offsetCheckInSlave = false


                    private long nextOffsetCorrection(long oldOffset, long newOffset) {
                    long nextOffset = oldOffset;
                    if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
                    nextOffset = newOffset;
                    }
                    return nextOffset;
                    }


                    当消息在正常有消息的范围内的话,获取消息存储的结果,算出最大过滤消息大小,因为存储的时候只是根据topic和queueId存储,但是有多个tag的话,这次只需要一个tag的消息的话,就需要从很多消息中过滤出自己想要的消息。校验消息存储的偏移量是否正确,检查消息是否在磁盘中,判断获取的消息是否达到限制,数量大小。

                      private int maxTransferBytesOnMessageInMemory = 1024 * 256;
                      private int maxTransferCountOnMessageInMemory = 32;
                      private int maxTransferBytesOnMessageInDisk = 1024 * 64;
                      private int maxTransferCountOnMessageInDisk = 8;


                      private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {


                      if (0 == bufferTotal || 0 == messageTotal) {
                      return false;
                      }


                      if (maxMsgNums <= messageTotal) {
                      return true;
                      }


                      if (isInDisk) {
                      if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
                      return true;
                      }


                      if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
                      return true;
                      }
                      } else {
                      if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
                      return true;
                      }


                      if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
                      return true;
                      }
                      }


                      return false;
                      }


                      文章转载自徘徊笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论