暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ源码解读----事务消息

废材姑娘 2021-02-22
414
点击上方蓝字“废材姑娘”关注我, 让我们一起成长


摄于:By: 双儿


开篇

在文章<RocketMQ的事务消息>一文中, 我们介绍了RocketMQ事务实现的基本原理, 并且给出了一个简单的Demo. 这篇文章我们就要通过源码的解读着重看看其实现的原理. 为了便于后面的分析, 我们把上篇中实现的事务监听拿过来:

@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {


@Resource
private TestOrderService testOrderService;


@Override
public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {
log.info("执行本地事务, 消息:{}", JSONObject.toJSONString(message));


// 执行本地业务逻辑, 如果本地事务执行成功, 则通知Broker可以提交消息让Consumer进行消费


TestOrder testOrder = (TestOrder) o;
try {
boolean success = testOrderService.createOrder(testOrder);
return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN;


} catch (Exception e) {
log.info("本地任务执行异常: {}", e.getMessage());


return RocketMQLocalTransactionState.ROLLBACK;
}
}


@Override
public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
log.info("check local transaction ");
// 提供事务执行状态的回查方法,提供给broker回调
// 正常情况下不会调用到
String orderId = (String) message.getPayload();
TestOrder testOrder = testOrderService.queryByOrderId(orderId);


if (ObjectUtil.isNotNull(testOrder)) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}

从实现中我们可以看到我们主要实现了接口RocketMQLocalTransactionListener, 但是这是rocketmq-spring-boot-starter封装的接口, 它的实际使用可以在源码RocketMQTransactionConfigurationRocketMQUtil中到:

@Configuration
public class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
// 省略....
private void registerTransactionListener(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);


if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
}
RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
}
((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
// 核心: 使用RocketMQUtil进行了转换
((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
}
}


// 转换的核心逻辑
public static TransactionListener convert(RocketMQLocalTransactionListener listener) {
// 转换成TransactionListener
return new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object obj) {
RocketMQLocalTransactionState state = listener.executeLocalTransaction(convertToSpringMessage(message), obj);
return convertLocalTransactionState(state);
}


@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
RocketMQLocalTransactionState state = listener.checkLocalTransaction(convertToSpringMessage(messageExt));
return convertLocalTransactionState(state);
}
};
}

所以RocketMQ的实现核心在接口TransactionListener, 下面就进入我们今天的主题, 看看TransactionListener是如何实现事务的.

RocketMQ的事务源码

public interface TransactionListener {
LocalTransactionState executeLocalTransaction(Message var1, Object var2);


LocalTransactionState checkLocalTransaction(MessageExt var1);
}

这个接口需要实现两个方法:

executeLocalTransaction:执行本地事务,在这里我们直接把订单数据插入到数据库中,并返回本地事务的执行结果。

checkLocalTransaction:反查本地事务,在这里我们的处理是,在数据库中查询订单号是否存在,如果存在则提交事务,如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回 UNKNOW。


Producer发送事务消息

接下来我们再来看看RocketMQ producer 是如何发送事务消息的.

    public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}


// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}


Validators.checkMessage(msg, this.defaultMQProducer);


SendResult sendResult = null;
// 这个给消息添加属性, 标明消息是事务消息, 也就是半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

//调用发送消息的方法, 发送这条半消息
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}


LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}


if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}


// 根据本地事务返回结果(localTransactionState)和事务消息的结果(sendResult) , 向Broker发送或提交事务消息请求

try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}


// 构造事务消息的返回结果
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

首先给待发送消息添加了一个属性 PROPERTY_TRANSACTION_PREPARED,表明这是一个事务消息,也就是半消息,然后会像发送普通消息一样去把这条消息发送到 Broker 上。如果发送成功了,就开始调用我们之前提供的接口 TransactionListener 的实现类中,执行本地事务的方法 executeLocalTransaction() 来执行本地事务,在我们的例子中就是在数据库中插入一条订单记录。

最后,根据半消息发送的结果和本地事务执行的结果,来决定提交或者回滚事务。在实现方法 endTransaction() 中,producer 就是给 Broker 发送了一个单向的 RPC 请求,告知 Broker 完成事务的提交或者回滚。由于有事务反查的机制来兜底,这个 RPC 请求即使失败或者丢失,也都不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。


Broker处理事务消息

然后, 我们再去看看Broker在收到消息后是如何处理的, 核心代码如下, 这段代码在package org.apache.rocketmq.broker.processor.SendMessageProcessor中, 网上给出的很多源码解读的文章都是在方法SendMessage()方法,自己拉下代码可以看到用的已经是异步的方法了.

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {


//省略.....

CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());

// 根据是否是事务消息的标志, 选择发送事务消息还是普通消息
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 发送事务消息
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 发送普通消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

这段代码主要是根据是否是事务消息的标志, 决定是进行事务消息的存储, 还是普通消息的存储,  我们关注的是事务消息, 所以顺着21行的代码继续往下看:

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}


// 这里几下了消息的实际队列和这题, 然后又在12行重新set了一个新的主题: TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

在这段代码中,RocketMQ 并没有把半消息保存到消息中客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题 RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0。这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。到这里,Broker 就初步处理完了 Producer 发送的事务半消息。


半消息事务回查

在org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService类中, 定义了一个定时任务(没分钟执行一次)用于半消息的事务会查.

    @Override
public void run() {
log.info("Start transaction check service thread!");
// 定时任务时间间隔
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}


@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 事务消息回查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

核心的逻辑在底19行, 调用check()方法

   
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
// 遍历所有半消息队列
for (MessageQueue messageQueue : msgQueues) {
// ...... 省略
while (true) {
// .....省略

// 实际去回查的地方
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
//.....省略
} catch (Throwable e) {
log.error("Check error", e);
}


}


这段代码的主要逻辑主要维护两个主题的队列RMQ_SYS_TRANS_HALF_TOPICRMQ_SYS_TRANS_OP_HALF_TOPIC, 前者是半消息队列, 后者是执行结束后存放消息的队列.  遍历半消息队列中的消息,如果已经存在于RMQ_SYS_TRANS_OP_HALF_TOPIC主题中, 则直接标记为完成,  如果没在完成的队列中, 则会判断是否具有回查条件, 如果有, 则回查并根据回查结果进行相应的处理.


  • RMQ_SYS_TRANS_HALF_TOPIC
    prepare消息的主题,事务消息首先先进入到该主题。

  • RMQ_SYS_TRANS_OP_HALF_TOPIC
    当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下。



Broker处理END_TRANSACTION

 // org.apache.rocketmq.broker.processor.EndTransactionProcessor


@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}

OperationResult result = new OperationResult();
//进入事务消息提交处理流程
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//根据commitLogOffset从commitlog文件中查找消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//恢复事务消息的真实的主题、队列,并设置事务ID
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);

//发送最终消息,被consumer消费
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//删除预处理消息(prepare)
//其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
// 回滚事务的处理流程
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//删除预处理消息(prepare)
//将消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}

这里的核心逻辑是:

  • 根据commitlogOffset找到消息

  • 如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理

  • 回滚消息,则直接将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理






文章转载自废材姑娘,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论