点击上方蓝色字体,选择“设为星标”
1
RocketMQ的msgId生成
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext = null;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader==null) {
return null;
}
// 消息轨迹:记录到达 broker 的消息
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
//处理生产者发送来的消息
final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
// 消息轨迹:记录发送成功的消息
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
final RemotingCommand request,//
final SendMessageContext mqtraceContext,//
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//省略代码
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//处理消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
//省略代码
}
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//省略非讲解代码
PutMessageResult result = this.commitLog.putMessage(msg);
//省略非讲解代码
}
在CommitLog的putMessage中找到对应的当前正在使用的MapedFile,然后调用MapedFile的appendMessage进行消息的落盘,具体的实施是由doAppend方法实施,在doAppend方法中, 由MessageDecoder.createMessageId生成,该方法需要三个参数,如下所示:
参数1(ByteBuffer) : 用于存储当前broker实例的host+port和消息的phyoffset.
参数2(ByteBuffer) : host+port的ByteBuffer.
参数3(long) : 消息在文件中的phyoffset.
如代码所示:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//省略非讲解代码
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
//省略非讲解代码
}
public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
assert msg != null;
assert cb != null;
int currentPos = this.wrotePostion.get();
// 表示有空余空间
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
//往MapedBuffer追加消息
AppendMessageResult result =
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePostion.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank, final Object msg) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
//msgId生成
String msgId =
MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),
wroteOffset);
//省略非讲解代码
}
2
根据msgId检索消息
RocketMQ console中,提供了根据msgId查询的入口,你只需要输入一个msgId即可查询,如下图:

RocketMQ提供了一些对外的可以操作nameserver和broker的命令接口,详情查看SubCommand的具体实现.当点击提交以后,msgId查询会使用QueryMsgByIdSubCommand这个子命令,在其内部经过一系列的调用,数据最终会由MQAdminImpl的viewMessage(String msgId)方法处理,该方法通过MessageDecoder对msgId进行解析,得到broker的addr(host+port)和消息的在broker的物理地址:phyoffset.如代码所示:
public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
try {
//解析出broker地址和phyoffset
MessageId messageId = MessageDecoder.decodeMessageId(msgId);
//调用另外一个重载viewMessage重载方法得到消息
return this.mQClientFactory.getMQClientAPIImpl().viewMessage(
RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3);
}
catch (UnknownHostException e) {
throw new MQClientException("message id illegal", e);
}
}
viewMessage重载方法代码所示:
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
requestHeader.setOffset(phyoffset);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
//与broker建立链接,然后把phyoffset作为请求参数,即可得到消息
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (!UtilAll.isBlank(projectGroupPrefix)) {
messageExt.setTopic(VirtualEnvUtil.clearProjectGroup(messageExt.getTopic(),
projectGroupPrefix));
}
return messageExt;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
这种查询消息的最终被broker的QueryMessageProcessor的processRequest处理,如下代码所示:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.QUERY_MESSAGE:
//按照Message Key查询消息
return this.queryMessage(ctx, request);
case RequestCode.VIEW_MESSAGE_BY_ID:
//按照MessageId查询消息
return this.viewMessageById(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ViewMessageRequestHeader requestHeader =
(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
//省略非讲解代码
final SelectMapedBufferResult selectMapedBufferResult =
this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
//省略非讲解代码
}
@Override
public SelectMapedBufferResult selectOneMessageByOffset(long commitLogOffset) {
SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
if (null != sbr) {
try {
// 1 TOTALSIZE
int size = sbr.getByteBuffer().getInt();
return this.commitLog.getMessage(commitLogOffset, size);
}
finally {
sbr.release();
}
}
return null;
}
文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




