
来源:https://github.com/apache/rocketmq
broker处理拉取消息请求,第一次brokerAllowSuspend为true。
public RemotingCommand processRequest(final ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {return this.processRequest(ctx.channel(), request, true);}
判断当前broker是都可读,是否由消费组的订阅信息,是否可被消费,
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));return response;}SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));return response;}if (!subscriptionGroupConfig.isConsumeEnable()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());return response;}
获取各个标志位,是否可暂停,是否提交偏移量,是否重新订阅,暂停时间
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
获取topic的信息,topic是否存在,是否可读,请求读取的队列id是否在可读队列的区间内,队列id从0开始。
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));return response;}if (!PermName.isReadable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");return response;}if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}
检查是否有消费组信息以及订阅等信息是否合法。下面的大部分校验的针对订阅的过滤表达式时非tag时的情况。
SubscriptionData subscriptionData = null;ConsumerFilterData consumerFilterData = null;if (hasSubscriptionFlag) {try {subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData != null;}} catch (Exception e) {log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark("parse the consumer's subscription failed");return response;}} else {ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (null == consumerGroupInfo) {log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}if (!subscriptionGroupConfig.isConsumeBroadcastEnable()&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");return response;}subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null == subscriptionData) {log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),subscriptionData.getSubString());response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);response.setRemark("the consumer's subscription not latest");return response;}if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),requestHeader.getConsumerGroup());if (consumerFilterData == null) {response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");return response;}if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);response.setRemark("the consumer's consumer filter data not latest");return response;}}}if (!ExpressionType.isTagType(subscriptionData.getExpressionType())&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());return response;}
消息过滤类默认为ExpressionMessageFilter
// whether do filter when retry.private boolean filterSupportRetry = false;MessageFilter messageFilter;if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}
获取消息
final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
判断当前状态是否可读正常,获取最大的消息偏移量,查找对应的消息消费队列
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums,final MessageFilter messageFilter) {if (this.shutdown) {log.warn("message store has shutdown, so getMessage is forbidden");return null;}if (!this.runningFlags.isReadable()) {log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());return null;}long beginTime = this.getSystemClock().now();GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;long nextBeginOffset = offset;long minOffset = 0;long maxOffset = 0;GetMessageResult getResult = new GetMessageResult();final long maxOffsetPy = this.commitLog.getMaxOffset();ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {minOffset = consumeQueue.getMinOffsetInQueue();maxOffset = consumeQueue.getMaxOffsetInQueue();if (maxOffset == 0) {status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);} else if (offset < minOffset) {status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else if (offset == maxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);} else if (offset > maxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);if (bufferConsumeQueue != null) {try {status = GetMessageStatus.NO_MATCHED_MESSAGE;long nextPhyFileStartOffset = Long.MIN_VALUE;long maxPhyOffsetPulling = 0;int i = 0;final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();int sizePy = bufferConsumeQueue.getByteBuffer().getInt();long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();maxPhyOffsetPulling = offsetPy;if (nextPhyFileStartOffset != Long.MIN_VALUE) {if (offsetPy < nextPhyFileStartOffset)continue;}boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),isInDisk)) {break;}boolean extRet = false, isTagsCodeLegal = true;if (consumeQueue.isExtAddr(tagsCode)) {extRet = consumeQueue.getExt(tagsCode, cqExtUnit);if (extRet) {tagsCode = cqExtUnit.getTagsCode();} else {// can't find ext content.Client will filter messages by tag also.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",tagsCode, offsetPy, sizePy, topic, group);isTagsCodeLegal = false;}}if (messageFilter != null&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);if (null == selectResult) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.MESSAGE_WAS_REMOVING;}nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);continue;}if (messageFilter != null&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}// release...selectResult.release();continue;}this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();getResult.addMessage(selectResult);status = GetMessageStatus.FOUND;nextPhyFileStartOffset = Long.MIN_VALUE;}if (diskFallRecorded) {long fallBehind = maxOffsetPy - maxPhyOffsetPulling;brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);}nextBeginOffset = offset + (i ConsumeQueue.CQ_STORE_UNIT_SIZE);long diff = maxOffsetPy - maxPhyOffsetPulling;long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() 100.0));getResult.setSuggestPullingFromSlave(diff > memory);} finally {bufferConsumeQueue.release();}} else {status = GetMessageStatus.OFFSET_FOUND_NULL;nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "+ maxOffset + ", but access logic queue failed.");}}} else {status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);}if (GetMessageStatus.FOUND == status) {this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();} else {this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();}long eclipseTime = this.getSystemClock().now() - beginTime;this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);getResult.setStatus(status);getResult.setNextBeginOffset(nextBeginOffset);getResult.setMaxOffset(maxOffset);getResult.setMinOffset(minOffset);return getResult;}
获取队列的最小最大下标与请求的下标进行比较,当maxOffset == 0代表没有消息产生,设置下一个拉取下标为0,当offset < minOffset说明下标太小,重置为minOffset,当offset == maxOffset说明超过最大下标一位,重置为offset ,当offset > maxOffset说明下标超过最大下标太多,说明是非法超过主broker的最大下标,重置为0或者maxOffset,当本broker为从的话,前面的所有都重置为offset
默认是offsetCheckInSlave = falseprivate long nextOffsetCorrection(long oldOffset, long newOffset) {long nextOffset = oldOffset;if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {nextOffset = newOffset;}return nextOffset;}
当消息在正常有消息的范围内的话,获取消息存储的结果,算出最大过滤消息大小,因为存储的时候只是根据topic和queueId存储,但是有多个tag的话,这次只需要一个tag的消息的话,就需要从很多消息中过滤出自己想要的消息。校验消息存储的偏移量是否正确,检查消息是否在磁盘中,判断获取的消息是否达到限制,数量大小。
private int maxTransferBytesOnMessageInMemory = 1024 * 256;private int maxTransferCountOnMessageInMemory = 32;private int maxTransferBytesOnMessageInDisk = 1024 * 64;private int maxTransferCountOnMessageInDisk = 8;private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {if (0 == bufferTotal || 0 == messageTotal) {return false;}if (maxMsgNums <= messageTotal) {return true;}if (isInDisk) {if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {return true;}if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {return true;}} else {if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {return true;}if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {return true;}}return false;}
文章转载自徘徊笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




