暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之msgKey查询逻辑

不修边幅的创客 2020-09-18
1386

        点击上方蓝色字体,选择“设为星标”


01 msgKey索引生成


在讲之前,我们需要知道生产者(producer)允许针对某条消息设置KEYS,具体可以参考Producer代码和MessageConst代码,其中KEYS可以是有多个关键词构成并且多个key之间使用空格分开的.要知道这一点,因为msgKey索引就是基于这些key生成的.

在前面的文章中:msgId查询逻辑 讲过了生产者生产一条消息到落盘的过程,在上一篇的基础上,我们继续读一些关键代码,然后看看msgKey的生成逻辑.当一条消息落盘成功之后,在CommitLog.putMessage(final MessageExtBrokerInner msg)中消息的相关信息会被封装成: DispatchRequest对象,然后丢给DispatchMessageService进行分发.如下代码所示:


  public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
      //省略其它非讲解代码
       
      //往文件中追加消息
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
       
      //省略其它非讲解d代码
       
     //如果追加成功了,则封装消息到DispatchRequest
     DispatchRequest dispatchRequest = new DispatchRequest(//
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10

    //交给DispachMessageService进行分发
this.defaultMessageStore.putDispatchRequest(dispatchRequest);

      //省略其它非讲解d代码
}
public void putDispatchRequest(final DispatchRequest dispatchRequest) {
    //调用DispatchMessageService的putRequest进行分发
this.dispatchMessageService.putRequest(dispatchRequest);
}


在DispatchMessageService.putRequest中,dispatchRequest被放到了List<DispatchRequest> requestWrtie集合中, DispatchMessageService线程在后台一直运行,不断执行doDispatch(),如下代码所示:


public void putRequest(final DispatchRequest dispatchRequest) {
int requestsWriteSize = 0;
int putMsgIndexHightWater =
DefaultMessageStore.this.getMessageStoreConfig().getPutMsgIndexHightWater();
synchronized (this) {
        //把分发请求放到集合中
this.requestsWrite.add(dispatchRequest);
requestsWriteSize = this.requestsWrite.size();
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
 }
privatevoid doDispatch() {
if (!this.requestsRead.isEmpty()) {
for (DispatchRequest req : this.requestsRead) {


final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
// 1、分发消息位置信息到ConsumeQueue
switch (tranType) {
case MessageSysFlag.TransactionNotType:
case MessageSysFlag.TransactionCommitType:
// 将请求发到具体的Consume Queue
DefaultMessageStore.this.putMessagePostionInfo(req.getTopic(), req.getQueueId(),
req.getCommitLogOffset(), req.getMsgSize(), req.getTagsCode(),
req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TransactionPreparedType:
case MessageSysFlag.TransactionRollbackType:
break;
}
}
      //消息发送到IndexService服务,构建msgKey的索引
if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray());
}


this.requestsRead.clear();
}
}


在IndexService的putRequest中,会把请求放到一个LinkedBlockingQueue阻塞队列中,然后由IndexService线程不停的从该队列中取消息,然后建立索引,其中该队列长度30万条消息,如果超过,会丢失,所以基于msgKey有可能存在查不到的情况.代码如下:


private LinkedBlockingQueue<Object[]> requestQueue = new LinkedBlockingQueue<Object[]>(300000);

/**
* 向队列中添加请求,队列满情况下,丢弃请求
*/
public void putRequest(final Object[] reqs) {
boolean offer = this.requestQueue.offer(reqs);
if (!offer) {
if (log.isDebugEnabled()) {
log.debug("putRequest index failed, {}", reqs);
}
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");


while (!this.isStoped()) {
try {
//线程不停的从队列中poll消息,然后建立索引
Object[] req = this.requestQueue.poll(3000, TimeUnit.MILLISECONDS);


if (req != null) {
//不为空,则建立索引
this.buildIndex(req);
}
}
catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}

public void buildIndex(Object[] req) {
boolean breakdown = false;
//获取当前正在写入的索引文件
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
    //循环所有消息,为每一条消息建立索引
MSG_WHILE: for (Object o : req) {
DispatchRequest msg = (DispatchRequest) o;
String topic = msg.getTopic();
        //取出当前消息的所有的keys,keys在文章的最前面说过,每个key用空格分开        String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {
continue;
         }
      //如果不为空,使用空格切割字符串,
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
          //循环key,为当前消息构建索引
for (String key : keyset) {
// TODO 是否需要TRIM
if (key.length() > 0) {
                    //除了topic和key,还有消息的commitLogoffset和消息的时间戳,构建索引
                    //所以msgKey支持范围查询
for (boolean ok =
indexFile.putKey(buildKey(topic, key), msg.getCommitLogOffset(),
msg.getStoreTimestamp()); !ok;) {
log.warn("index file full, so create another one, " + indexFile.getFileName());
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
breakdown = true;
break MSG_WHILE;
}

ok =
indexFile.putKey(buildKey(topic, key), msg.getCommitLogOffset(),
msg.getStoreTimestamp());
}
}
}
      }
}
private String buildKey(final String topic, final String key) {
return topic + "#" + key;
}


在IndexFile的putKey中,计算key(topic  + "#" + key)、phyOffset和storeTimeStamp的哈希值,然后写入到索引文件MappedByteBuffer中,至此基于消息keys的索引就建立好了,代码如下:


public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
      //省略了其它非讲解代码
      // 写入真正索引
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
      //省略其它非讲解代码
}


基于msgKey的消息索引的建立流程已经讲过,那如何根据topic和msgKey对消息进行检索呢?


02 使用msgKey检索消息


如下图所示,RocketMQ的console中提供了基于msgKey查询的接口,topic和key必须填写,如下图所示:



提交之后,最终被QueryMsgByKeySubCommand的queryByKey方法执行(RocketMQ Console引入里rocketmq客户端端),默认会加上三个默认参数 : 

参数1 : 最大返回消息数 : 默认64

参数2 : 开始时间戳 : 0

参数3 : 结束时间戳 : Long.MAX_VALUE

代码展示如下:


voidqueryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
admin.start();
    //查询消息
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
System.out.printf("%-50s %4s %40s\n",//
"#Message ID",//
"#QID",//
"#Offset");
for (MessageExt msg : queryResult.getMessageList()) {
System.out.printf("%-50s %4d %40d\n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());
}
}


经过一系列的方法调用之后,我们可以看MQAdminImpl.queryMessage,在该方法中,获取当前topic的路由信息(这里注意,RocketMQ Console项目会引入rocketmq客户端,并启动MQClientInstance并与nameserver交互),解析出当前topic所在的broker地址,循环broker地址,封装请求信息:topic、key、maxNum、begin和end信息去broker请求信息,然后把每个broker的查询结果封装到List中,返回给用户,代码如下:


public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
        //获取路由信息
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
if (null == topicRouteData) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
}
//解析出当前topic所在的broker地址
if (topicRouteData != null) {
List<String> brokerAddrs = new LinkedList<String>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
brokerAddrs.add(addr);
}
}


if (!brokerAddrs.isEmpty()) {
final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
//封装所有结果
final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
                //循环每个broker地址,封装请求信息,建立链接,获取消息
for (String addr : brokerAddrs) {
try {
                        //封装请求信息
QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
requestHeader.setEndTimestamp(end);
                        //请求broker
this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader,
                            1000 * 15new InvokeCallback() {
                             public void operationComplete(ResponseFuture responseFuture) {
                             //省略其它非讲解d代码
                            
                             List<MessageExt> wrappers =
MessageDecoder.decodes(
ByteBuffer.wrap(response.getBody()), true);
                                         //封装结果




QueryResult qr =
new QueryResult(responseHeader
.getIndexLastUpdateTimestamp(), wrappers);
                                        //List 保存结果
                                        queryResultList.add(qr);

                                         //省略其它非讲解代码
                             }
                     }
                     //省略了很多代码
            }
      }
}


在broker上,有单独的类来处理查询消息的请求,跟msgId的位置一样,在类QueryMessageProcessor的processRequest中,根据msgKey的查询逻辑会走RequestCode.QUERY_MESSAGE,代码如下:


@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.QUERY_MESSAGE:
//按照Message Key查询消息
return this.queryMessage(ctx, request);
case RequestCode.VIEW_MESSAGE_BY_ID:
//按照MessageId查询消息
return this.viewMessageById(ctx, request);
default:
break;
    }
return null;
}
  public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException 
       //省略了其它非讲解代码
       final QueryMessageResult queryMessageResult =
this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
requestHeader.getEndTimestamp());
//省略了其它非讲解d代码
    }


在DefaultMessageStore.queryMessage中对消息进行查询,代码如下:


public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {


        //找出所有符合条件的phyOffset,只找maxNum就返回             
        QueryOffsetResult queryOffsetResult =
this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
        //省略其它代码
        //循环集合取数据
        for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
           //...
         if (match) {
                //根据phyOffset取数据,并把结果封装                
SelectMapedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
}
        
}
}



文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论