暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

万字详解RocketMQ使用

IT那活儿 2025-04-22
55

点击上方“IT那活儿”公众号--专注于企业全栈运维技术分享,不管IT什么活儿,干就完了!!!


  
消息中间件以前常用RabbitMQ和ActiveMQ,由于业务需要,后期业务偏向大数据,现着重学习一下RocketMQ(RocketqMQ原理同ctg-mq),后续更新Kafka。
推荐阅读:《RocketMQ两主两从部署搭建



RocketMQ特性

1.1 Kafka特性(高性能分布式)

吞吐量大,支持消息大量推挤,支持topic离线,支持分布式,使用ZooKeeper实现负载均衡,支持Hadoop数据并行加载。

1.2 RocketMQ特性

  • 1)Broker 服务器

    Broker 服务器是RocketMQ的核心。

    主要功能:消息的处理(接收生产者Producer发送过来的消息(持久化),推送消息给消费者Consumer),消息的存储。NameServer 服务器:记录Producert信息、Broker信息、Consumer信息、Topic主题信息,NameServer服务器在这里作为控制中心、注册中心、路由,服务启动顺序,先启动NameServer再启动Broker,将Broker服务器的ip注册到NameServer服务中。

    业务流程:如果生产者Producer需要推送消息至Broker服务器中,需要先去NameServer服务器中查找到对应的Broker服务器,然后生产者端Producer与Broker服务器建立连接。

  • 2)能够保证严格的消息顺序(顺序消费、顺序拉取)

    丰富的消息拉取模式:push模式(等待Broker推送消息,推荐使用,),pull模式(主动向Broker主动拉取消息),pull与push模式同时可以满足使用需求的情况下,建议优先使用push模式。

  • 3)可以多节点生产和多节点消费;

  • 4)消息事务机制,目前只有RocketMQ支持,Kafka和RabbitMQ不支持;

  • 5)亿级消息堆积;

  • 6)吞吐量高,但比Kafka低;

  • 7)消息重推、死信队列。

1.3 RabbitMQ特性

吞吐量比Kafka、RocketMQ低。


RocketMQ消费模式

2.1 Push推模式-DefaultMQPushConsumer原理

Consumer消费者向Broker服务器发送请求,Consumer通过请求与Broker服务器保持一种长连接的形式,Broker服务器每5s检查一次是否存在消息,如果有就推送给Consumer消费者。

2.2 Pull拉模式-DefaultMQPullConsumer原理

Consumer消费者主动去Broker服务器拉取数据,一般使用本地定时任务去拉取,由于需要保证消息的及时性,一般推荐使用Push推模式订阅消息。

2.3 轮询监控机制

RocketMQ默认将Producer生产者消息发送至4(不一定4个)个队列中进行存储,Consumer消费方通过轮询的方式去监控这个4个队列(轮询监控机制)。

2.4 ack机制

LocalTransactionState标识消息的状态,通过判断返回的枚举值enum做出相应处理。

  • 1)COMMIT_MESSAGE

    消息可见,目前事务消息分为提交不可见消息和可见消息。

  • 2)ROLLBACK_MESSAGE

    消息需要回滚。

  • 3)UNKNOW

    消息异常或超时时返回该枚举值,重复回查信息。


RocketMQ实战

3.1 消息发送实现流程

1)引入pom依赖,目前最新版本为5.3.0,推荐使用4.4.0

<dependency> 
<groupId>org.apache.rocketmq</groupId> 
<artifactId>rocketmq-client</artifactId> 
<version>版本</version> 
</dependency>

<dependency> 
<groupId>com.alibaba.cloud</groupId>   <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> 
</dependency>

2)创建DefualtMQProducer实例对象
3)设置NaemServer地址
4)开启DefaultMQProducer
5)创建消息Message
6)发送消息
7)关闭DefaultMQProducer
package com.rocketmq.demo1.demos.rocketmq.study.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author durunwu
 * @data 2024/8/17
 */

publicclassRocketMQProducer{

    /**
     * RocketMQ生产者发送消息
     */

    publicstaticvoidmain(String[] args)throws MQClientException,
                                                  UnsupportedEncodingException,
                                                  MQBrokerException,
                                                  RemotingException,
                                                  InterruptedException 
{
        //RocketMQ消息生产者和消费者都是一个组的概念(Producer集群、Consuemr集群),这样可以支持大量消息的生产和消费

        //1.创建DefualtMQProducer实例对象,参数:组名
        DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");

        //2.设置NaemServer地址
        producer.setNamesrvAddr("192.168.211.141:9876");
        
        //3.开启DefaultMQProducer
        producer.start();
        
        /*
            4.创建消息Message
            String topic 主题名称:当前消息是哪一类(哪一主题)的数据,这个传入主题对应的名称
            String tags 标签:通过当前标签可以查询到对应的消息或特定的消息,tags主要是用于过滤,来分类&标记消息的
            String keys 消息的唯一值:类似数据库记录的id,可以通过这个Keys获取到对应的消息信息,定位消息信息
            byte[] body 消息信息内容:需要传入字节数组格式
         */

        Message message = new Message("Topic_Demo",
                                       "Tags",
                                       "Keys_1",
                                            "hello!".getBytes(RemotingHelper.DEFAULT_CHARSET));

        //5.发送消息
        SendResult result = producer.send(message);
        System.out.println("打印消息内容:" + result);

        //6.关闭DefaultMQProducer
        producer.shutdown();

    }


}

消息发送之后,查看RocketMQ控制台,选到代码里的Topic主题(Topic_Demo),然后选择消息推送的时间,就可以查看到当前发现的消息记录:

3.2 消息消费流程实现

目前业务需要实现消费业务,着重学习消费端逻辑。

  • 创建DefaultMQPushConsumer;

  • 设置NameServer地址;

  • 设置subscribe,这里是要读取的主题信息;

  • 创建监听器MessageListener;

  • 获取消息信息;

  • 返回消息读取状态。

消费者流程代码,这里使用Push推模式实现,并设置了消息拉取最大上限setConsumeMessageBatchMaxSize(2)为2条消息,监听器选择普通监听器MessageListenerConcurrently,如果需要实现顺序消费可以选用MessageListenerOrderly,消息消费时如果有异常出现,注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发RocketMQ消息重推机制,如果消息消费成功,只需返回枚举类。

enum ConsumeConcurrentlyStatus.CONSUME_SUCCESS即可表示消息推送成功。

publicclass Consumer {
    publicstaticvoid main(String[] args) throws MQClientException {

        //1. 创建DefaultMQPushConsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
        //2. 设置NameServer地址
        consumer.setNamesrvAddr("192.168.211.141:9876");
        //3. 设置subscribe,这里是要读取的主题信息
        consumer.subscribe("Topic_Name",//执行要消费的主题
                "Tags || TagsA || TagsB");//过滤规则 "*"则表示全部订阅
        //4. 创建监听器MessageListener
        //4.1 设置消息拉取最大数(上限)-最大拉取两条
        consumer.setConsumeMessageBatchMaxSize(2);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /*
                MessageListenerConcurrently 普通消息的接收
                MessageListenerOrderly 顺序消息的接收
             */

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                /**
                 * List<MessageExt> msgs 可以从Broker获取多条数据,默认是32条,可以设置上限
                 */

                //5. 获取消息信息
                //迭代消息信息
                for (MessageExt msg : msgs) {
                    //获取主题
                    String topic = msg.getTopic();
                    //获取标签
                    String tags = msg.getTags();

                    //获取信息
                    byte[] body = msg.getBody();
                    try {
                        String result = newString(body, RemotingHelper.DEFAULT_CHARSET);

                        //todo 实现业务...
                        System.out.println("Consumer消费信息--topic: " + topic + ", tags: " + tags + ", result: "+ result);

                    } catch (UnsupportedEncodingException e) {
                        //throw new RuntimeException(e);
                        //注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发重推机制
                        e.printStackTrace();
                        //消息消费失败,重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //6. 返回消息读取状态
                    //消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                }
                returnnull;
            }
        });

        //开启RockerMQ消费端
        consumer.start();

    }

}

3.3 消息顺序发送流程实现

因为上面说过,普通的RocketMQ消息是发送到4个队列中,这里RocketMQ也不能保住全局的消息顺序,只能保证局部的顺序。

为什么这里无法保住全局的消息顺序呢,假如我发送三条消息,可能同时存入到不同的三个队列中,因为是同步存入队列,所以无法确定消息的顺序。

要保住消息的顺序只能将消息发送到同一个队列中(指定一个队列),因为单个队列肯定是能够保住消息的顺序的。

RocketMQ消息生产者和消费者都是一个组的概念(Producer集群、Consuemr集群),这样可以支持大量消息的生产和消费。

  • 1)创建DefualtMQProducer实例对象,参数:组名;

  • 2)设置NaemServer地址;

  • 3)开启DefaultMQProducer;

  • 4)创建消息Message

    String topic 主题名称:当前消息是哪一类(哪一主题)的数据,这个传入主题对应的名称。

    String tags 标签:通过当前标签可以查询到对应的消息或特定的消息,tags主要是用于过滤,来分类&标记消息的。

    String keys 消息的唯一值:类似数据库记录的id,可以通过这个Keys获取到对应的消息信息,定位消息信息。

    byte[] body 消息信息内容:需要传入字节数组格式。

  • 5)发送消息;

  • 6)关闭DefaultMQProducer。

package com.rocketmq.demo1.demos.rocketmq.study.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

publicclassOrderProducer{
    publicstaticvoidmain(String[] args)throws MQClientException,
            UnsupportedEncodingException,
            MQBrokerException,
            RemotingException,
            InterruptedException 
{
        //RocketMQ消息生产者和消费者都是一个组的概念(Producer集群、Consuemr集群),这样可以支持大量消息的生产和消费

        //1.创建DefualtMQProducer实例对象,参数:组名
        DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");

        //2.设置NaemServer地址
        producer.setNamesrvAddr("192.168.211.141:9876");

        //3.开启DefaultMQProducer
        producer.start();

        /*
            4.创建消息Message
            String topic 主题名称:当前消息是哪一类(哪一主题)的数据,这个传入主题对应的名称
            String tags 标签:通过当前标签可以查询到对应的消息或特定的消息,tags主要是用于过滤,来分类&标记消息的
            String keys 消息的唯一值:类似数据库记录的id,可以通过这个Keys获取到对应的消息信息,定位消息信息
            byte[] body 消息信息内容:需要传入字节数组格式
         */


        //5.发送消息
        //发送5条消息
        for (int i = 0; i < 5; i++) {

            Message message = new Message("Topic_Order_Demo",
                    "Tags",
                    "Keys_" + i,
                    ("hello!" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            /*
                5.1顺序发送消息,将消息发送到指定的队列中
                参数1:消息信息Message
                参数2:指定要发送的队列对象MessageQueueSelector
                参数3:指定队列的下标,这里传的0表示第一个队列,传1表示第2个队列
            */

            SendResult result = producer.send(message, new MessageQueueSelector() {
                        /*
                            List<MessageQueue> mqs
                            返回当前队列的信息,如果设置的4个队列,就会返回这4个队列的信息
                            如果是集群环境,这里队列的个数=集群数*队列数

                            Object arg
                            会将参数3:arg:0 传入到select方法中去查询第一个队列
                         */

                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){
                            //获取队列的下标
                            Integer index = (Integer) arg;
                            //返回第一个队列
                            MessageQueue messageQueue = mqs.get(index);
                            return messageQueue;
                        }
                    },
                    0);

            
            
            System.out.println("打印消息内容:" + result);
        }



        //6.关闭DefaultMQProducer
        producer.shutdown();

    }

}

队列index对应queueId:

3.4 消息顺序消费流程实现

生产者设置消息的顺序发送:

消费者需要设置监听器为MessageListenerOrderly消息顺序监听器,并且返回类型需要变更为ConsumeOrderlyStatus:

  • 消息发送失败,需要重试则返回

    ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。

  • 消息发送成功,则返回

    ConsumeOrderlyStatus.SUCCESS。

package com.rocketmq.demo1.demos.rocketmq.study.listener;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

publicclass OrderConsumer {
    publicstaticvoid main(String[] args) throws MQClientException {
        //1. 创建DefaultMQPushConsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
        //2. 设置NameServer地址
        consumer.setNamesrvAddr("192.168.211.141:9876");
        //3. 设置subscribe,这里是要读取的主题信息
        consumer.subscribe("Topic_Order_Demo",//执行要消费的主题
                "Tags || TagsA || TagsB");//过滤规则 "*"则表示全部订阅
        //4. 创建监听器MessageListener
        //4.1 设置消息拉取最大数(上限)-最大拉取两条
        consumer.setConsumeMessageBatchMaxSize(2);

        /**
         * 顺序消费
         */

        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                /**
                 * List<MessageExt> msgs 可以从Broker获取多条数据,默认是32条,可以设置上限
                 */

                //5. 获取消息信息
                //迭代消息信息
                for (MessageExt msg : msgs) {
                    //获取主题
                    String topic = msg.getTopic();
                    //获取标签
                    String tags = msg.getTags();

                    //获取信息
                    byte[] body = msg.getBody();
                    try {
                        String result = newString(body, RemotingHelper.DEFAULT_CHARSET);

                        //todo 实现业务...
                        System.out.println("OrderConsumer消费信息--topic: " + topic + ", tags: " + tags + ", result: "+ result);

                    } catch (UnsupportedEncodingException e) {
                        //throw new RuntimeException(e);
                        //注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发重推机制
                        e.printStackTrace();

                        //消息消费失败,重试
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                //6. 返回消息读取状态
                //消息消费成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });


        //开启RockerMQ消费端
        consumer.start();

    }

}


RocketMQ事务消息

注:RocketMQ4.3.0版本之后才支持事务消息。

4.1 事务消息流程

第1步:Producer生产者发送Prepare消息(预请求消息)到Broker服务器,写入消息到HalfTopic中,注意这里消费者是无法读取HalfTopic消息的。

第2步:预请求消息发送成功之后会回调执行本地事务操作(保存消息信息到本地数据库,做本地数据库的事务操作)
第3步:如果本地事务执行成功(消息成功Insert到本地并且未回滚),将消息commit到Broker服务器的OpTopic中,然后写到RealTopic中,这里消费者就可以获取到消息了。

到第3步就表示事务消息成功执行了,下面是对消息事务异常做出处理:

第4步:如果消息消费失败,会重试发送消息,如果一直重试失败(这里可能就不是RocketMQ的问题,需要检查本地代码是否存在问题),需要做出补偿操作(补偿机制:多写一份代码,解决当前的问题)

第5步:如果是本地事务执行超时,Producer生产者会返回超时或Unknow状态,此时Broker服务器会进行消息回调方法进行事务的回查,检查消息是否执行成功或执行到哪一步了。

4.2 Producer生产者发送事务消息到Broker服务器

指定消息监听对象,用于执行本地事务和消息回查,生产者Producer需要指定线程池:

TransactionListener transactionListener = new TransactionListenerImpl();

producer.setTransactionListener(transactionListener);

实现代码:

package com.rocketmq.demo1.rocketmq.study.transaction;

import com.rocketmq.demo1.rocketmq.producer.TransactionListenerImpl;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.jetbrains.annotations.NotNull;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author durunwu 
 * 事务消息
 */

publicclassTransactionProducer{
    publicstaticvoidmain(String[] args)throws MQClientException, UnsupportedEncodingException {

        //1.创建DefualtMQProducer实例对象,参数:组名
        TransactionMQProducer producer = new TransactionMQProducer("demo_producer_transaction_group");

        //2.设置NaemServer地址
        producer.setNamesrvAddr("192.168.211.141:9876");

        //指定消息监听对象,用于执行本地事务和消息回查
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);

        //线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(@NotNull Runnable r){
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    }
                }
        );
        //设置线程池
        producer.setExecutorService(executor);

        //3.开启DefaultMQProducer
        producer.start();

        /*
            4.创建消息Message
            String topic 主题名称:当前消息是哪一类(哪一主题)的数据,这个传入主题对应的名称
            String tags 标签:通过当前标签可以查询到对应的消息或特定的消息,tags主要是用于过滤,来分类&标记消息的
            String keys 消息的唯一值:类似数据库记录的id,可以通过这个Keys获取到对应的消息信息,定位消息信息
            byte[] body 消息信息内容:需要传入字节数组格式
         */

        Message message = new Message("Topic_Transaction_Demo",
                "Tags",
                "Keys_1",
                "hello!-Transaction".getBytes(RemotingHelper.DEFAULT_CHARSET));


        //5.发送消息-发送事务消息即可
        TransactionSendResult result = producer.sendMessageInTransaction(message,"hello-transaction");

        System.out.println("打印消息内容:" + result);

        //6.关闭DefaultMQProducer
        producer.shutdown();

    }
}

4.3 消息写成功执行本地事务,在TransactionListenerImpl上实现

public class TransactionListenerImpl implements TransactionListener{}

1)使用ConcurrentHashMap记录本地事务执行状态

  • 0:执行中,状态未知;

  • 1:本地事务执行成功;

  • 2:本地事务执行失败。

2)LocalTransactionState
  • 事务执行成功返回commit

    LocalTransactionState.COMMIT_MESSAGE

  • 事务执行超时返回UNKNOW消息将重新发送

    LocalTransactionState.UNKNOW

  • 事务需要回滚返回rollback

    LocalTransactionState.ROLLBACK_MESSAGE

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;


/**
 * @author durunwu
 */

@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,
        maximumPoolSize = 10)
publicclass TransactionListenerImpl implements TransactionListener{

    //存储对应事务的状态信息 key:事务id,value:当前事务的执行状态
    private ConcurrentHashMap<String,Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * @author durunwu
     * 2.回调方法
     * (1)获取Broker事务id
     * (2)执行本地事务
     */

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //(1)获取Broker事务id
        String transactionId = msg.getTransactionId();

        /*
            0:执行中,状态未知
            1:本地事务执行成功
            2:本地事务执行失败
         */

        localTrans.put(transactionId,0);

        //todo (2)执行本地事务
        //>>>>>业务执行,处理本地事务,service
        System.out.println("helli!---Demo---Transaction");
        //模拟本地事务

        try {
            System.out.println("正在执行本地事务---");
            Thread.sleep(12000);
            System.out.println("正在执行本地事务---成功");
            //本地事务执行成功
            localTrans.put(transactionId,1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            //本地事务执行失败
            localTrans.put(transactionId,2);
            //消息事务回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        /*
            LocalTransactionState
            事务执行成功返回commit: LocalTransactionState.COMMIT_MESSAGE
            事务执行超时返回UNKNOW消息将重新发送:LocalTransactionState.UNKNOW
            事务需要回滚返回rollback: LocalTransactionState.ROLLBACK_MESSAGE
         */

        //消息顺利执行成功,返回commit
        return LocalTransactionState.COMMIT_MESSAGE;

    }

}

4.4 事务回查,Broker服务器回查Producer生产者事务

消息事务回查,每分钟回查一次?

/**
 * @author durunwu
 */

@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,
        maximumPoolSize = 10)
publicclassTransactionListenerImplimplementsTransactionListener{

    //存储对应事务的状态信息 key:事务id,value:当前事务的执行状态
    private ConcurrentHashMap<String,Integer> localTrans = new ConcurrentHashMap<>();


    /**
     * @author durunwu
     * 消息事务回查
     * 每分钟回查一次?
     */

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg){
        //(1)获取Broker事务id
        String transactionId = msg.getTransactionId();

        //获取当前事务执行状态
        Integer state = localTrans.get(transactionId);

        System.out.println("消息回查---transactionId:" + transactionId + ",状态: " + state);

        switch (state) {
            case0:
                //消息再次执行,重发
                return LocalTransactionState.UNKNOW;
            case1:
                //消息事务执行成功,commit
                return LocalTransactionState.COMMIT_MESSAGE;
            case2:
                //消息事务执行失败,rollback回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return LocalTransactionState.UNKNOW;;
    }
}

4.5 RocketMQ实现分布式事务流程

MQ事务消息解决分布式事务问题,但第三方MQ支持事务消息的中间件不多,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如RabbitMQ和Kafka都不支持。

以阿里的RocketMQ中间件为例,其思路大致为:

  • 第一阶段Prepared消息,会拿到消息的地址。

  • 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。


END


本文作者:杜润伍(上海新炬中北团队)

本文来源:“IT那活儿”公众号

文章转载自IT那活儿,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论