
摄于:By: 双儿
开篇
在文章<RocketMQ的事务消息>一文中, 我们介绍了RocketMQ事务实现的基本原理, 并且给出了一个简单的Demo. 这篇文章我们就要通过源码的解读着重看看其实现的原理. 为了便于后面的分析, 我们把上篇中实现的事务监听拿过来:
@Slf4j@Component@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Resourceprivate TestOrderService testOrderService;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {log.info("执行本地事务, 消息:{}", JSONObject.toJSONString(message));// 执行本地业务逻辑, 如果本地事务执行成功, 则通知Broker可以提交消息让Consumer进行消费TestOrder testOrder = (TestOrder) o;try {boolean success = testOrderService.createOrder(testOrder);return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN;} catch (Exception e) {log.info("本地任务执行异常: {}", e.getMessage());return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {log.info("check local transaction ");// 提供事务执行状态的回查方法,提供给broker回调// 正常情况下不会调用到String orderId = (String) message.getPayload();TestOrder testOrder = testOrderService.queryByOrderId(orderId);if (ObjectUtil.isNotNull(testOrder)) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}}
从实现中我们可以看到我们主要实现了接口RocketMQLocalTransactionListener, 但是这是rocketmq-spring-boot-starter封装的接口, 它的实际使用可以在源码RocketMQTransactionConfiguration和RocketMQUtil中到:
@Configurationpublic class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton {// 省略....private void registerTransactionListener(String beanName, Object bean) {Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());}RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");}((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));// 核心: 使用RocketMQUtil进行了转换((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());}}// 转换的核心逻辑public static TransactionListener convert(RocketMQLocalTransactionListener listener) {// 转换成TransactionListenerreturn new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object obj) {RocketMQLocalTransactionState state = listener.executeLocalTransaction(convertToSpringMessage(message), obj);return convertLocalTransactionState(state);}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {RocketMQLocalTransactionState state = listener.checkLocalTransaction(convertToSpringMessage(messageExt));return convertLocalTransactionState(state);}};}
所以RocketMQ的实现核心在接口TransactionListener, 下面就进入我们今天的主题, 看看TransactionListener是如何实现事务的.
RocketMQ的事务源码
public interface TransactionListener {LocalTransactionState executeLocalTransaction(Message var1, Object var2);LocalTransactionState checkLocalTransaction(MessageExt var1);}
这个接口需要实现两个方法:
executeLocalTransaction:执行本地事务,在这里我们直接把订单数据插入到数据库中,并返回本地事务的执行结果。
checkLocalTransaction:反查本地事务,在这里我们的处理是,在数据库中查询订单号是否存在,如果存在则提交事务,如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回 UNKNOW。
Producer发送事务消息
接下来我们再来看看RocketMQ producer 是如何发送事务消息的.
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameterif (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 这个给消息添加属性, 标明消息是事务消息, 也就是半消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());//调用发送消息的方法, 发送这条半消息try {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}// 根据本地事务返回结果(localTransactionState)和事务消息的结果(sendResult) , 向Broker发送或提交事务消息请求try {this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 构造事务消息的返回结果TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}
首先给待发送消息添加了一个属性 PROPERTY_TRANSACTION_PREPARED,表明这是一个事务消息,也就是半消息,然后会像发送普通消息一样去把这条消息发送到 Broker 上。如果发送成功了,就开始调用我们之前提供的接口 TransactionListener 的实现类中,执行本地事务的方法 executeLocalTransaction() 来执行本地事务,在我们的例子中就是在数据库中插入一条订单记录。
最后,根据半消息发送的结果和本地事务执行的结果,来决定提交或者回滚事务。在实现方法 endTransaction() 中,producer 就是给 Broker 发送了一个单向的 RPC 请求,告知 Broker 完成事务的提交或者回滚。由于有事务反查的机制来兜底,这个 RPC 请求即使失败或者丢失,也都不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。
Broker处理事务消息
然后, 我们再去看看Broker在收到消息后是如何处理的, 核心代码如下, 这段代码在package org.apache.rocketmq.broker.processor.SendMessageProcessor中, 网上给出的很多源码解读的文章都是在方法SendMessage()方法,自己拉下代码可以看到用的已经是异步的方法了.
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {//省略.....CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());// 根据是否是事务消息的标志, 选择发送事务消息还是普通消息String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 发送事务消息putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 发送普通消息putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
这段代码主要是根据是否是事务消息的标志, 决定是进行事务消息的存储, 还是普通消息的存储, 我们关注的是事务消息, 所以顺着21行的代码继续往下看:
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {return store.asyncPutMessage(parseHalfMessageInner(messageInner));}// 这里几下了消息的实际队列和这题, 然后又在12行重新set了一个新的主题: TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}
在这段代码中,RocketMQ 并没有把半消息保存到消息中客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题 RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0。这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。到这里,Broker 就初步处理完了 Producer 发送的事务半消息。
半消息事务回查
在org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService类中, 定义了一个定时任务(没分钟执行一次)用于半消息的事务会查.
@Overridepublic void run() {log.info("Start transaction check service thread!");// 定时任务时间间隔long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();while (!this.isStopped()) {this.waitForRunning(checkInterval);}log.info("End transaction check service thread!");}@Overrideprotected void onWaitEnd() {long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);// 事务消息回查this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);}
核心的逻辑在底19行, 调用check()方法
@Overridepublic void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {try {String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);if (msgQueues == null || msgQueues.size() == 0) {log.warn("The queue of topic is empty :" + topic);return;}log.debug("Check topic={}, queues={}", topic, msgQueues);// 遍历所有半消息队列for (MessageQueue messageQueue : msgQueues) {// ...... 省略while (true) {// .....省略// 实际去回查的地方if (isNeedCheck) {if (!putBackHalfMsgQueue(msgExt, i)) {continue;}listener.resolveHalfMsg(msgExt);} else {pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,messageQueue, pullResult);continue;}}newOffset = i + 1;i++;}//.....省略} catch (Throwable e) {log.error("Check error", e);}}
这段代码的主要逻辑主要维护两个主题的队列RMQ_SYS_TRANS_HALF_TOPIC 和RMQ_SYS_TRANS_OP_HALF_TOPIC, 前者是半消息队列, 后者是执行结束后存放消息的队列. 遍历半消息队列中的消息,如果已经存在于RMQ_SYS_TRANS_OP_HALF_TOPIC主题中, 则直接标记为完成, 如果没在完成的队列中, 则会判断是否具有回查条件, 如果有, 则回查并根据回查结果进行相应的处理.
RMQ_SYS_TRANS_HALF_TOPIC
prepare消息的主题,事务消息首先先进入到该主题。RMQ_SYS_TRANS_OP_HALF_TOPIC
当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下。

Broker处理END_TRANSACTION
// org.apache.rocketmq.broker.processor.EndTransactionProcessor@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}OperationResult result = new OperationResult();//进入事务消息提交处理流程if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {//根据commitLogOffset从commitlog文件中查找消息result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {//恢复事务消息的真实的主题、队列,并设置事务IDMessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);//发送最终消息,被consumer消费RemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {//删除预处理消息(prepare)//其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}// 回滚事务的处理流程} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {//删除预处理消息(prepare)//将消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;}
这里的核心逻辑是:
根据commitlogOffset找到消息
如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理
回滚消息,则直接将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理





