暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之msgId查询逻辑

不修边幅的创客 2020-09-17
1713

点击上方蓝色字体,选择“设为星标”


1

RocketMQ的msgId生成
当生产者(producer)发送一条消息时,消息的msgId是在消息发送到broker并落盘时生成的,然后返回给生产者.
broker中是由SendMessageProcessor的processRequest处理的,代码如下:
 @Override
  public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext = null;
    switch (request.getCode()) {
    case RequestCode.CONSUMER_SEND_MSG_BACK:
      return this.consumerSendMsgBack(ctx, request);
    default:
      SendMessageRequestHeader requestHeader = parseRequestHeader(request);
      if (requestHeader==null) {
        return null;
      }
      // 消息轨迹:记录到达 broker 的消息
      mqtraceContext = buildMsgContext(ctx, requestHeader);
      this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
      //处理生产者发送来的消息
      final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
      // 消息轨迹:记录发送成功的消息
      this.executeSendMessageHookAfter(response, mqtraceContext);
      return response;
    }
  }


this.sendMessage(ctx, request, mqtraceContext, requestHeader)内部会把消息封装成MessageExtBrokerInner,然后调用MessageStore的putMessage进行保存,代码如下:
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
            final RemotingCommand request,//
            final SendMessageContext mqtraceContext,//
            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

     //省略代码
     MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
     //处理消息
     PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
     //省略代码
}


在putMessage中会对消息进行一些合法性校验,然后调用CommitLog的putMessage对消息进行存储,代码如下:
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
     //省略非讲解代码
     PutMessageResult result = this.commitLog.putMessage(msg);
    //省略非讲解代码
}


在CommitLog的putMessage中找到对应的当前正在使用的MapedFile,然后调用MapedFile的appendMessage进行消息的落盘,具体的实施是由doAppend方法实施,在doAppend方法中,  由MessageDecoder.createMessageId生成,该方法需要三个参数,如下所示:

参数1(ByteBuffer) : 用于存储当前broker实例的host+port和消息的phyoffset.

参数2(ByteBuffer) : host+port的ByteBuffer.

参数3(long) : 消息在文件中的phyoffset.

如代码所示:

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

  //省略非讲解代码
  result = mapedFile.appendMessage(msg, this.appendMessageCallback);
  //省略非讲解代码
}

public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
        assert msg != null;
        assert cb != null;

        int currentPos = this.wrotePostion.get();

        // 表示有空余空间
        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            //往MapedBuffer追加消息
            AppendMessageResult result =
                    cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
            this.wrotePostion.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
}

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
                final int maxBlank, final Object msg)
 
{
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
            MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;
            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();
            //msgId生成
            String msgId =
                    MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),
                        wroteOffset);
            //省略非讲解代码
}

思考一下:假如我们给一个msgId,我们就可以从这个msgId中解析出broker的host和port,然后对这个broker发送请求,请求中带着commitLog的phyoffset就可以很快检索到消息了.

2

根据msgId检索消息

RocketMQ console中,提供了根据msgId查询的入口,你只需要输入一个msgId即可查询,如下图:

RocketMQ提供了一些对外的可以操作nameserver和broker的命令接口,详情查看SubCommand的具体实现.当点击提交以后,msgId查询会使用QueryMsgByIdSubCommand这个子命令,在其内部经过一系列的调用,数据最终会由MQAdminImpl的viewMessage(String msgId)方法处理,该方法通过MessageDecoder对msgId进行解析,得到broker的addr(host+port)和消息的在broker的物理地址:phyoffset.如代码所示:

public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException,
            InterruptedException, MQClientException
{
        try {
            //解析出broker地址和phyoffset
            MessageId messageId = MessageDecoder.decodeMessageId(msgId);
            //调用另外一个重载viewMessage重载方法得到消息
            return this.mQClientFactory.getMQClientAPIImpl().viewMessage(
                RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3);
        }
        catch (UnknownHostException e) {
            throw new MQClientException("message id illegal", e);
        }
    }


viewMessage重载方法代码所示:

public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
            throws RemotingException, MQBrokerException, InterruptedException
{
        ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
        requestHeader.setOffset(phyoffset);
        RemotingCommand request =
                RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
        //与broker建立链接,然后把phyoffset作为请求参数,即可得到消息
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
            MessageExt messageExt = MessageDecoder.decode(byteBuffer);
            if (!UtilAll.isBlank(projectGroupPrefix)) {
                messageExt.setTopic(VirtualEnvUtil.clearProjectGroup(messageExt.getTopic(),
                    projectGroupPrefix));
            }
            return messageExt;
        }
        default:
            break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }


这种查询消息的最终被broker的QueryMessageProcessor的processRequest处理,如下代码所示:

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException
{
        switch (request.getCode()) {
        case RequestCode.QUERY_MESSAGE:
            //按照Message Key查询消息
            return this.queryMessage(ctx, request);
        case RequestCode.VIEW_MESSAGE_BY_ID:
            //按照MessageId查询消息
            return this.viewMessageById(ctx, request);
        default:
            break;
        }

        return null;
}


接下来继续看,this.viewMessageById方法,方法内部从请求中解析出commitLogOffset然后调用MessageStore的selectOneMessageByOffset方法取出消息,如下代码:
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException
{
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ViewMessageRequestHeader requestHeader =
                (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
        //省略非讲解代码
        final SelectMapedBufferResult selectMapedBufferResult =
                this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
        //省略非讲解代码
}


@Override
public SelectMapedBufferResult selectOneMessageByOffset(long commitLogOffset) {
        SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
        if (null != sbr) {
            try {
                // 1 TOTALSIZE
                int size = sbr.getByteBuffer().getInt();
                return this.commitLog.getMessage(commitLogOffset, size);
            }
            finally {
                sbr.release();
            }
        }
        return null;
}


文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论