暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之消费重试机制以及代码分析

技术王老五 2021-06-21
1427

生产者消息重试

生产者在发送消息(不包含顺序发送消息)的时候,同步、异步不进行重试,oneway不进行重试

消息重试原则上可以保证消息发送成功以及不丢失,但是消息重新投递可能造成消费者重复消费,RocketMQ不保证幂等性,所以开发者如果有幂等性的要求,需要自行保证幂等

mq重试的默认值:同步需要开启重试配置:retryAnotherBrokerWhenNotStoreOK = true,默认是不开启重试

    private int retryTimesWhenSendFailed = 2;
    private int retryTimesWhenSendAsyncFailed = 2;

    也可以自行设置重试次数

      //异步重试
      producer.setRetryTimesWhenSendAsyncFailed(3);
      //同步重试
      producer.setRetryTimesWhenSendFailed(3);
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //根据topic负载均衡算法选择一个MessageQueue
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
        beginTimestampPrev = System.currentTimeMillis();
        if (times > 0) {
        //Reset topic with namespace during resend.
        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
        }
        long costTime = beginTimestampPrev - beginTimestampFirst;
        if (timeout < costTime) {
        callTimeout = true;
        break;
        }

        //向 MessageQueue 发送消息
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
        endTimestamp = System.currentTimeMillis();
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
        switch (communicationMode) {
        case ASYNC:
        return null;
        case ONEWAY:
        return null;
        case SYNC:
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
        //判断是否开启重复发送。默认是关闭
        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
        continue;
        }
        }

        return sendResult;
        default:
        break;
        }
        }
        }

        同步发送就是简单的for循环重新重试注意(需要 retryAnotherBrokerWhenNotStoreOK = true),异步重试是在callback中进行判断,如果response为空重试,或者在处理processSendResponse存在异常时重试

         消费者重试

        RocketMQ在消费者消费失败的时候提供重试,代码有点多,主要就是异步拉取消息(其中有一个参数pullCallBack回调函数),省略的逻辑主要是:拉取之前会进行一些阈值、消息大小以及偏移量的判断,决定是否要延迟放入队列中

          public void pullMessage(final PullRequest pullRequest) {
          //主要判断 消息的阈值、队列的大小等决定是否要延迟加入处理的队列中
          .....
          //回掉函数,★ 1
          PullCallback pullCallback = new PullCallback(){....}
          try {
          //异步拉取信息
          this.pullAPIWrapper.pullKernelImpl(
          pullRequest.getMessageQueue(),
          subExpression,
          subscriptionData.getExpressionType(),
          subscriptionData.getSubVersion(),
          pullRequest.getNextOffset(),
          this.defaultMQPushConsumer.getPullBatchSize(),
          sysFlag,
          commitOffsetValue,
          BROKER_SUSPEND_MAX_TIME_MILLIS,
          CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
          CommunicationMode.ASYNC,
          pullCallback //★ 2
          );
          } catch (Exception e) {
          log.error("pullKernelImpl exception", e);
          this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
          }
          }

          其中pullKernelImpl()这个方法调用逻辑,调用的整体逻辑

            PullAPIWrapper.pullKernelImpl()//会根据broker名称获取broker信息 设置消息头信息
            --MQClientAPIImpl.pullMessage()//根据communicationMode获取具体的调用逻辑--异步拉取消息
            --MQClientAPIImpl.pullMessageAsync()//拉取消息 回掉pullCallback
            -- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()// 回掉方法调用submitConsumeRequest
            --ConsumeMessageConcurrentlyService.ConsumeRequest.run()//最终执行为线程池中的run方法

            其中run方法会调用我们消费端写的listene实现类,这就是mq的消费的整体逻辑

            其中status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);执行消费段写的listener,

             

            消费的成功以及失败之后的逻辑:ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);去判断消费是成功了还是失败了,失败了重新发送

                                                                                  ↑↑↑↑↑↑↑↑图例1↑↑↑↑↑↑↑↑

            如果消费成功,ackIndex是consumeRequest.getMsgs().size() - 1;就不执行for循环(for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)),也就不执行补偿,失败ackIndex == -1,重新发送消息,

            广播模式,只打印一条日志,不进行其他的任何操作,

            集群模式:跟踪方法sendMessageBack(msg, context);

            • 首先根据 brokerName 得到 broker 地址信息,然后通过网络发送到指定的 Broker上。

            • 如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的主题为重试主题:"%RETRY%" + ConsumeGroupName, 注意,这里的主题和原先的消息主题没任何关系而是和消费组相关。

            同步调用告诉服务端请求 CONSUMER_SEND_MSG_BACK,到broker中找到SendMessageProcessor的asyncConsumerSendMsgBack方法 

            通过网络发送到指定的 Broker上 

            CONSUMER_SEND_MSG_BACK,到broker中找到SendMessageProcessor的asyncConsumerSendMsgBack方法 

            获取订阅组的信息

            SubscriptionGroupConfig

            以下是主要的属性

            获取topic主题

            获取消息

            根据偏移量在commitlog中获取message信息

             

            获取延迟等级

            写入commitlog中

            主题为%RETRY%+consumerGroup,主题是基于消费者组来的,并不是基于原来的主题进行重新定义的,如果消息发送失败了,5s之后重新放入队列重新发送(  ↑↑↑↑↑↑↑↑图例1↑↑↑↑↑↑↑↑图示有失败之后的逻辑)

            这些失败的消息,直接更新偏移量,定义为已经消费的消息

              long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
              if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
              this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
              }

              整体流程

              • 根据消费结果,设置ackIndex的值。

              • 如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack,这里会创建新的消息(重试次数,延迟执行)。

              • 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)。

              欢迎关注关注博主的公众号,可以领取 回复888 可以获取一份面试资料

               


              文章转载自技术王老五,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论