暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之顺序消费逻辑

不修边幅的创客 2020-09-25
783
点击上方蓝色字体,选择“设为星标”



1

生产者保证消息的投递顺序

顺序消息消费是指消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息的处理结果,比如:一个订单的顺序流程是:创建、付款、推送、完成。在处理时不可能乱了顺序

一、默认情况下消息的投递方式

默认情况下,RocketMQ生产者生产的消息在所有的MessageQueue中轮询选择一个,进行投递,也就是说同一个生产者的消息被发送到不同的MessageQueue,在被消费者消费时,就无法保证消息被顺序消费,默认投递的MessageQueue的核心代码如下:


private SendResult sendDefaultImpl(//
            Message msg,//
            final CommunicationMode communicationMode,//
            final SendCallback sendCallback, final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

            //轮询选择一个MessageQueue进行投递
            MessageQueue tmpmq = topicPublishInfo.

public class TopicPublishInfo {

    //MessageQueue列表
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //记录当前消费实例发送消息 原子递增
    private AtomicInteger sendWhichQueue = new AtomicInteger(0);

    //....
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //lastBrokerName是上一次发送失败的broker,规避上次失败的broker,不再其上面发送
        if (lastBrokerName != null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }

            return null;
        }
        else {
            //原子性的递增
            int index = this.sendWhichQueue.getAndIncrement();
            //对MessageQueue列表求余
            int pos = Math.abs(index) % this.messageQueueList.size();
            //获取需要发送的MessageQueue
            return this.messageQueueList.get(pos);
        }
    }
}


由于消费者是绑定不同的队列进行消费,如果生产者采用这种轮询的方式进行生产,受消费者实例的处理能力的影响,肯定就没办法控制消息消费的先后了

二、生产端如何发送顺序消息

RocketMQ在设计的时候考虑了这种顺序消息的场景,所以在生产者的时候,允许按照某种规则对消息进行设置,在这种规则的驱动下,可以控制某一类消息按先后顺序进入某一个指定的MessageQueue,这样消费者就有能力按顺序处理消息

MQProducer接口定义了如下接口,使用者需要自己实现MessageQueueSelector的select接口,即可实现消息按规则发送,代码如下:


SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException
;

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}


拿官网的例子举例,代码如下:

public class Producer {
    public static void main(String[] args) {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();
            String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
            for (int i = 0; i < 100; i++) {
                // 订单ID相同的消息要有序
                int orderId = i % 10;
                Message msg =
                        new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes());
                //通过订单ID对MessageQueue进行Hash,即可把相同订单ID的消息发送到同一个MessageQueue
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
{
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.println(sendResult);
            }
            producer.shutdown();
        }
        catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}


在发送消息时,通过调用select获取MessageQueue进行消息的发送,代码如下:

private SendResult sendSelectImpl(//
            Message msg,//
            MessageQueueSelector selector,//
            Object arg,//
            final CommunicationMode communicationMode,//
            final SendCallback sendCallback, final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

                //调用selector.select获取MessageQueue进行发送
                mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);

    }


消息发送已经按照预设好的方式保证了消息的顺序性,但是只是生产者保证顺序性就一定能保证消息是按严格顺序方式消费吗?这个是不一定的,默认情况下同一个消费者实例是多线程处理消息的,也就是每一个消息可能是同一个消费实例线程池中的不同线程处理,那如何保证消息被消费的顺序性呢 ? 接下来继续讲消费者保证消息的顺序消费 .


2

消费者保证消息的消费顺序

很多人立刻想到,如果把消费端实例的线程池里面线程数设置成1,是不是就可以了呢,这个当然是没有问题的,如果是使用MessageListenerConcurrently,则需要把线程池改为单线程模式,但不是很专业.假如有一个消息消费失败,可能会阻塞后面的所有消息的消费。接下来讲一个消费者专业的处理方式

消费端提供了专门处理顺序消息的方法,这个方法就要涉及到MessageListenerOrderly,针对上面的生产,我们使用该方法来消费的话,代码为:

consumer.registerMessageListener(new MessageListenerOrderly() {

           Random random = new Random();

           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }

               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });

       consumer.start();


如果MessageListener的实现是MessageListenerOrderly,DefaultMQPushConsumerImpl里面的属性consumeMessageService为ConsumeMessageOrderlyService,所有的顺序消费的逻辑都在这个类中处理.当消费者启动时,会调用start方法,该方法会启动一个每隔20秒的定时任务,该任务是锁定所有的MessageQueue,代码如下:

public void start() {
        // 启动定时lock队列服务
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl
            .messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run()
{
                    //延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
        }
    }


public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            //这个方法会锁定相关broker下面的相关的messagequeue对应的processQueue,这个地方下面会用到
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }


继续调用RebalanceImpl.lockAll()方法对所有broker上的当前topic的MessageQueue进行锁定(所谓锁定,就是把MessageQueue对应的ProcessQueue的成员变量locked设置为true),如果锁定失败,将暂停当前MessageQueue的拉取,直到下个定时任务重新锁定

public void lockAll() {
        //根据当前负载的消息队列,按照 Broker分类存储在Map。
        //负载的消息队列在RebalanceService时根据当前消费者数量与消息消费队列按照负载算法进行分配,然后尝试对该消息队列加锁,如果申请锁成功,则加入到待拉取任务中
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();

        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            final String brokerName = entry.getKey();
            final Set<MessageQueue> mqs = entry.getValue();

            if (mqs.isEmpty())
                continue;

            //根据Broker获取主节点的地址。
            FindBrokerResult findBrokerResult =
                    this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                requestBody.setConsumerGroup(this.consumerGroup);
                requestBody.setClientId(this.mQClientFactory.getClientId());
                requestBody.setMqSet(mqs);

                try {
                    //向Broker发送锁定消息队列请求,该方法会返回本次成功锁定的消息消费队列,关于Broker端消息队列锁定实现见下文详细分析。
                    Set<MessageQueue> lockOKMQSet =
                            this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(
                                    findBrokerResult.getBrokerAddr(), requestBody, 1000);

                    //遍历本次成功锁定的队列来更新对应的ProcessQueue的locked状态,如果locked为false,则设置成true,并更新锁定时间。
                    for (MessageQueue mq : lockOKMQSet) {
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        if (processQueue != null) {
                            if (!processQueue.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                            }
                            //processQueue被锁定标识和锁定时间
                            processQueue.setLocked(true);
                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    //遍历mqs,如果消息队列未成功锁定,需要将ProcessQueue的locked状态为false,在该处理队列未被其他消费者锁定之前,该消息队列将暂停拉取消息。
                    for (MessageQueue mq : mqs) {
                        if (!lockOKMQSet.contains(mq)) {
                            ProcessQueue processQueue = this.processQueueTable.get(mq);
                            if (processQueue != null) {
                                processQueue.setLocked(false);
                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup,
                                        mq);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }


为什么会有这个锁定呢 ? 顺序消费的时候使用,消费之前会判断一下ProcessQueue锁定时间是否超过阈值(默认30000ms),如果没有超时,代表还是持有锁

先稍微讲一下消费逻辑,首先RebalancePushImpl.dispatchPullRequest方法把当前消费者实例负责的MessageQueue及对应的ProcessQueue封装成了pullRequest进行分发,会被分发到一个PullMessageService.pullRequestQueue的阻塞链表队列中,然后PullMessageService线程不停从该队列中获取请求(没有就阻塞)拉取消息,在DefaultMQPushConsumerImpl.pullMessage中执行真正的消费逻辑这里会不停的拉取(当然有限流),拉取到消息后被异步处理,但是请放心异步处理也会按顺序提交consumeOffset,不会造成Offset大的消息先消费的情况,这里我们异步处理类及方法:

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {

     ...
     switch (pullResult.getPullStatus()) {
             case FOUND:

               ...
               //拉取到的消息,会被放到对应的processQueue中,processQueue会做相应的顺序消费控制
               //这里注意:如果返回true,才可以往线程池添加消费请求
               boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
               //然后在把当前结果封装成ConsumeRequest放到消费者线程池中处理
               DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                pullResult.getMsgFoundList(), //
                                processQueue, //
                                pullRequest.getMessageQueue(), //
                                dispathToConsume);
}


ProcessQueue.putMessage是互斥的,不能同时写入,写入时会把消息转存到内部的TreeMap中,key是当前MessageQueue的queueOffset,value是MessageExt,并且是基于queueOffset排好序的消息,代码如下:

public boolean putMessage(final List<MessageExt> msgs) {
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {
                        validMsgCnt++;
                        this.queueOffsetMax = msg.getQueueOffset();
                    }
                }
                //msgCount记录为消费的消息
                msgCount.addAndGet(validMsgCnt);

                // 如果ProcessQueue有需要处理的消息(从上可知,如果msgs不为空那么msgTreeMap不为空)
                // 如果consuming为false,将其设置为true,表示正在消费
                // 这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为false
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                    // 有消息,且为未消费状态,则顺序消费模式可以消费
                    dispatchToConsume = true;
                    this.consuming = true;
                }

                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    // property为ConsumeQueue里最大的offset
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {// 当前消息的offset与最大消息的差值,相当于还有多少offset没有消费
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            }
            finally {
                this.lockTreeMap.writeLock().unlock();
            }
        }
        catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }

        return dispatchToConsume;
    }

@Override
public void submitConsumeRequest(//
        final List<MessageExt> msgs, //
        final ProcessQueue processQueue, //
        final MessageQueue messageQueue, //
        final boolean dispathToConsume) {
    if (dispathToConsume) {
         //封装成ConsumeRequest,由线程池里的线程处理消息的消费
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}


ConsumeRequest的run方法执行真正的消费逻辑,一边介绍消费逻辑,一边看代码吧.

1、processQueue不能被dropped,示被废弃了;

//如果消息队列状态为 dropped 为true,则停止本次消息消费。
if (this.processQueue.isDropped()) {
   log.warn("run, the message queue not be able to consume, because it's dropped. {}",
                    this.messageQueue);
    return;
}


2、处理时,对当前MessageQueue加锁,也就是说,即使有多个线程处理同一个MessageQueue的消息,也只会有一个线程对MessageQueue这个对象进行处理,其它线程等待(注意:不同MessageQueue不会阻塞,可以并行消费),这里有人会想,如果这个线程释放了锁,下一个拿到锁对线程会不会先消费QueueOffset大的消息先消费呢 ? 答案是肯定不会,因为:下一个拿到锁对线程,也是对ProcessQueue同一个人对象里的消费进行处理,记得哈,是同一个ProcessQueue,并且内部的消息是通过QueueOffset排好序的.

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
      //当前messageQueue开始消费
}


3、如果当前ProcessQueue的锁还在,并且没有失效则可以消费;

if (MessageModel.BROADCASTING
                    .equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
    
    //循环对当前MessageQueue进行消费
}


4、for(boolean continueConsume = true;continueConsume)这个循环很有意思,表示当前MessageQueue是否可以持续消费,是否可以继续消费的标准有两个:  a、是在consumeOffset的提交,如果提交成功,就可以继续消费,直到processQueue中msgTreeMap为空; b、consumeOffset没有正常提交.比如其它状态:SUSPEND_CURRENT_QUEUE_A_MOMENT

for (boolean continueConsume = true; continueConsume;) {
    //循环消费processQueue中的消息
}


case SUSPEND_CURRENT_QUEUE_A_MOMENT:
    //重新消费该批消息
    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
    this.submitConsumeRequestLater(//
            consumeRequest.getProcessQueue(), //
            consumeRequest.getMessageQueue(), //
            context.getSuspendCurrentQueueTimeMillis());
    //停止当前messageQueue循环消费
    continueConsume = false;

    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
            consumeRequest.getMessageQueue().getTopic(), msgs.size());
    break;


5、次判断processQueue的状态,不能drop,锁定状态,锁没有过期;否则就重新把messagequeue和processqueue包装成consumerequest放到线程池,重新拉取消费.

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,
                                this.processQueue, 10);


6、然后就调用当前锁定对象的processQueue.takeMessages获取消息,takeMessages会从msgTreeMap对象中按传入的consumeBatchSize参数,按QueueOffset的大小(从小到大)取数据进行消费,这个过程也是互斥的,不能同时进行,然后执行消费了,如下:

List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);

//....

//真正执行消息的消费
status =
        messageListener.consumeMessage(Collections.unmodifiableList(msgs),
            context);


// 从ProcessQueue中取出batchSize条消息
public List<MessageExt> takeMessags(final int batchSize) {
    List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            // 从treeMap中获取batchSize条数据,每次都返回offset最小的那条并移除
            if (!this.msgTreeMap.isEmpty()) {
                for (int i = 0; i < batchSize; i++) {
                    Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                    if (entry != null) {
                        // 放到返回列表和一个临时用的treemapp中
                        result.add(entry.getValue());
                        msgTreeMapTemp.put(entry.getKey(), entry.getValue());
                    }
                    else {
                        break;
                    }
                }
            }

            // 取到消息了就会开始进行消费,如果没取到,则不需要消费,那么consuming设为false
            if (result.isEmpty()) {
                consuming = false;
            }
        }
        finally {
            this.lockTreeMap.writeLock().unlock();
        }
    }
    catch (InterruptedException e) {
        log.error("take Messages exception", e);
    }

    return result;
}


7、ConsumeOrderlyStatus记录了消费执行的结果,根据不同的结果,要么进行consumeOffset提交、要么对消息重新消费,这里展示一个自动提交的(注意:手动提交需要业务自己把握ConsumeOrderlyStatus里四个的四个含义,根据实际的处理结果进行返回

continueConsume =
        ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status,
            context, this);


boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
    switch (status) {
    case COMMIT:
    case ROLLBACK:
        log.warn(
            "the message queue consume result is illegal, we think you want to ack these message {}",
            consumeRequest.getMessageQueue());
    case SUCCESS:
        //提交本次消费的consumeOffset
        commitOffset = consumeRequest.getProcessQueue().commit();
        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup,
            consumeRequest.getMessageQueue().getTopic(), msgs.size());
        break;
    case SUSPEND_CURRENT_QUEUE_A_MOMENT:
        //重新消费该批消息
        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
        this.submitConsumeRequestLater(//
                consumeRequest.getProcessQueue(), //
                consumeRequest.getMessageQueue(), //
                context.getSuspendCurrentQueueTimeMillis());
        continueConsume = false;

        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup,
                consumeRequest.getMessageQueue().getTopic(), msgs.size());
        break;
    default:
        break;
    }
}


consumeOffset提交到broker,代码如下:

if (commitOffset >= 0) {
//consumeOffset提交到broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),
        commitOffset, false);
}


如果返回到是SUSPEND_CURRENT_QUEUE_A_MOMENT,这个里面会把本次消费的消息重新放回到ProcessQueue中的msgTreeMap然后由下个线程接着消费,为什么说是下个线程呢? 因为这里会把continueConsume设置为false,返回时,执行当前ConsumeRequest进行消费的线程就会完成消费,下一个获取当前MessageQueue的锁的线程接着处理当前的processQueue(不同MessageQueue对应的processQueue永远是同一个对象),代码如下:

//重新消费该批信息
//makeMessageToCosumeAgain在顺序消费客户端返回消息状态为SUSPEND_CURRENT_QUEUE_A_MOMENT时调用;
//将消息从msgTreeMapTemp移除,并将该批消息重新放入msgTreeMap。
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            for (MessageExt msg : msgs) {
                this.msgTreeMapTemp.remove(msg.getQueueOffset());
                this.msgTreeMap.put(msg.getQueueOffset(), msg);
            }
        }
        finally {
            this.lockTreeMap.writeLock().unlock();
        }
    }
    catch (InterruptedException e) {
        log.error("makeMessageToCosumeAgain exception", e);
    }
}


文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论