生产者消息重试
private int retryTimesWhenSendFailed = 2;private int retryTimesWhenSendAsyncFailed = 2;
//异步重试producer.setRetryTimesWhenSendAsyncFailed(3);//同步重试producer.setRetryTimesWhenSendFailed(3);
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//根据topic负载均衡算法选择一个MessageQueueMessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//向 MessageQueue 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {//判断是否开启重复发送。默认是关闭if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}}}
消费者重试
public void pullMessage(final PullRequest pullRequest) {//主要判断 消息的阈值、队列的大小等决定是否要延迟加入处理的队列中.....//回掉函数,★ 1PullCallback pullCallback = new PullCallback(){....}try {//异步拉取信息this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback //★ 2);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
PullAPIWrapper.pullKernelImpl()//会根据broker名称获取broker信息 设置消息头信息--MQClientAPIImpl.pullMessage()//根据communicationMode获取具体的调用逻辑--异步拉取消息--MQClientAPIImpl.pullMessageAsync()//拉取消息 回掉pullCallback-- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()// 回掉方法调用submitConsumeRequest--ConsumeMessageConcurrentlyService.ConsumeRequest.run()//最终执行为线程池中的run方法
首先根据 brokerName 得到 broker 地址信息,然后通过网络发送到指定的 Broker上。
如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的主题为重试主题:"%RETRY%" + ConsumeGroupName, 注意,这里的主题和原先的消息主题没任何关系而是和消费组相关。
获取订阅组的信息
获取topic主题
获取消息
获取延迟等级
写入commitlog中
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
整体流程
根据消费结果,设置ackIndex的值。
如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack,这里会创建新的消息(重试次数,延迟执行)。
更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)。
文章转载自技术王老五,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。



















