点击上方蓝色字体,选择“设为星标”
01 msgKey索引生成
在讲之前,我们需要知道生产者(producer)允许针对某条消息设置KEYS,具体可以参考Producer代码和MessageConst代码,其中KEYS可以是有多个关键词构成并且多个key之间使用空格分开的.要知道这一点,因为msgKey索引就是基于这些key生成的.
在前面的文章中:msgId查询逻辑 讲过了生产者生产一条消息到落盘的过程,在上一篇的基础上,我们继续读一些关键代码,然后看看msgKey的生成逻辑.当一条消息落盘成功之后,在CommitLog.putMessage(final MessageExtBrokerInner msg)中消息的相关信息会被封装成: DispatchRequest对象,然后丢给DispatchMessageService进行分发.如下代码所示:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {//省略其它非讲解代码//往文件中追加消息result = mapedFile.appendMessage(msg, this.appendMessageCallback);//省略其它非讲解d代码//如果追加成功了,则封装消息到DispatchRequestDispatchRequest dispatchRequest = new DispatchRequest(//topic,// 1queueId,// 2result.getWroteOffset(),// 3result.getWroteBytes(),// 4tagsCode,// 5msg.getStoreTimestamp(),// 6result.getLogicsOffset(),// 7msg.getKeys(),// 8/*** Transaction*/msg.getSysFlag(),// 9msg.getPreparedTransactionOffset());// 10//交给DispachMessageService进行分发this.defaultMessageStore.putDispatchRequest(dispatchRequest);//省略其它非讲解d代码}
public void putDispatchRequest(final DispatchRequest dispatchRequest) {//调用DispatchMessageService的putRequest进行分发this.dispatchMessageService.putRequest(dispatchRequest);}
在DispatchMessageService.putRequest中,dispatchRequest被放到了List<DispatchRequest> requestWrtie集合中, DispatchMessageService线程在后台一直运行,不断执行doDispatch(),如下代码所示:
public void putRequest(final DispatchRequest dispatchRequest) {int requestsWriteSize = 0;int putMsgIndexHightWater =DefaultMessageStore.this.getMessageStoreConfig().getPutMsgIndexHightWater();synchronized (this) {//把分发请求放到集合中this.requestsWrite.add(dispatchRequest);requestsWriteSize = this.requestsWrite.size();if (!this.hasNotified) {this.hasNotified = true;this.notify();}}}
privatevoid doDispatch() {if (!this.requestsRead.isEmpty()) {for (DispatchRequest req : this.requestsRead) {final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());// 1、分发消息位置信息到ConsumeQueueswitch (tranType) {case MessageSysFlag.TransactionNotType:case MessageSysFlag.TransactionCommitType:// 将请求发到具体的Consume QueueDefaultMessageStore.this.putMessagePostionInfo(req.getTopic(), req.getQueueId(),req.getCommitLogOffset(), req.getMsgSize(), req.getTagsCode(),req.getStoreTimestamp(), req.getConsumeQueueOffset());break;case MessageSysFlag.TransactionPreparedType:case MessageSysFlag.TransactionRollbackType:break;}}//消息发送到IndexService服务,构建msgKey的索引if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray());}this.requestsRead.clear();}}
在IndexService的putRequest中,会把请求放到一个LinkedBlockingQueue阻塞队列中,然后由IndexService线程不停的从该队列中取消息,然后建立索引,其中该队列长度30万条消息,如果超过,会丢失,所以基于msgKey有可能存在查不到的情况.代码如下:
private LinkedBlockingQueue<Object[]> requestQueue = new LinkedBlockingQueue<Object[]>(300000);
/*** 向队列中添加请求,队列满情况下,丢弃请求*/public void putRequest(final Object[] reqs) {boolean offer = this.requestQueue.offer(reqs);if (!offer) {if (log.isDebugEnabled()) {log.debug("putRequest index failed, {}", reqs);}}}
@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStoped()) {try {//线程不停的从队列中poll消息,然后建立索引Object[] req = this.requestQueue.poll(3000, TimeUnit.MILLISECONDS);if (req != null) {//不为空,则建立索引this.buildIndex(req);}}catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");}
public void buildIndex(Object[] req) {boolean breakdown = false;//获取当前正在写入的索引文件IndexFile indexFile = retryGetAndCreateIndexFile();if (indexFile != null) {long endPhyOffset = indexFile.getEndPhyOffset();//循环所有消息,为每一条消息建立索引MSG_WHILE: for (Object o : req) {DispatchRequest msg = (DispatchRequest) o;String topic = msg.getTopic();//取出当前消息的所有的keys,keys在文章的最前面说过,每个key用空格分开 String keys = msg.getKeys();if (msg.getCommitLogOffset() < endPhyOffset) {continue;}//如果不为空,使用空格切割字符串,if (keys != null && keys.length() > 0) {String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);//循环key,为当前消息构建索引for (String key : keyset) {// TODO 是否需要TRIMif (key.length() > 0) {//除了topic和key,还有消息的commitLogoffset和消息的时间戳,构建索引//所以msgKey支持范围查询for (boolean ok =indexFile.putKey(buildKey(topic, key), msg.getCommitLogOffset(),msg.getStoreTimestamp()); !ok;) {log.warn("index file full, so create another one, " + indexFile.getFileName());indexFile = retryGetAndCreateIndexFile();if (null == indexFile) {breakdown = true;break MSG_WHILE;}ok =indexFile.putKey(buildKey(topic, key), msg.getCommitLogOffset(),msg.getStoreTimestamp());}}}}}
private String buildKey(final String topic, final String key) {return topic + "#" + key;}
在IndexFile的putKey中,计算key(topic + "#" + key)、phyOffset和storeTimeStamp的哈希值,然后写入到索引文件MappedByteBuffer中,至此基于消息keys的索引就建立好了,代码如下:
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//省略了其它非讲解代码// 写入真正索引this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//省略其它非讲解代码}
基于msgKey的消息索引的建立流程已经讲过,那如何根据topic和msgKey对消息进行检索呢?
02 使用msgKey检索消息
如下图所示,RocketMQ的console中提供了基于msgKey查询的接口,topic和key必须填写,如下图所示:

提交之后,最终被QueryMsgByKeySubCommand的queryByKey方法执行(RocketMQ Console引入里rocketmq客户端端),默认会加上三个默认参数 :
参数1 : 最大返回消息数 : 默认64
参数2 : 开始时间戳 : 0
参数3 : 结束时间戳 : Long.MAX_VALUE
代码展示如下:
voidqueryByKey(final DefaultMQAdminExt admin, final String topic, final String key)throws MQClientException, InterruptedException {admin.start();//查询消息QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);System.out.printf("%-50s %4s %40s\n",//"#Message ID",//"#QID",//"#Offset");for (MessageExt msg : queryResult.getMessageList()) {System.out.printf("%-50s %4d %40d\n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());}}
经过一系列的方法调用之后,我们可以看MQAdminImpl.queryMessage,在该方法中,获取当前topic的路由信息(这里注意,RocketMQ Console项目会引入rocketmq客户端,并启动MQClientInstance并与nameserver交互),解析出当前topic所在的broker地址,循环broker地址,封装请求信息:topic、key、maxNum、begin和end信息去broker请求信息,然后把每个broker的查询结果封装到List中,返回给用户,代码如下:
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)throws MQClientException, InterruptedException {//获取路由信息TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);if (null == topicRouteData) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);}//解析出当前topic所在的broker地址if (topicRouteData != null) {List<String> brokerAddrs = new LinkedList<String>();for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {String addr = brokerData.selectBrokerAddr();if (addr != null) {brokerAddrs.add(addr);}}if (!brokerAddrs.isEmpty()) {final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());//封装所有结果final List<QueryResult> queryResultList = new LinkedList<QueryResult>();//循环每个broker地址,封装请求信息,建立链接,获取消息for (String addr : brokerAddrs) {try {//封装请求信息QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();requestHeader.setTopic(topic);requestHeader.setKey(key);requestHeader.setMaxNum(maxNum);requestHeader.setBeginTimestamp(begin);requestHeader.setEndTimestamp(end);//请求brokerthis.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader,1000 * 15, new InvokeCallback() {public void operationComplete(ResponseFuture responseFuture) {//省略其它非讲解d代码List<MessageExt> wrappers =MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);//封装结果QueryResult qr =new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);//List 保存结果queryResultList.add(qr);//省略其它非讲解代码}}//省略了很多代码}}}
在broker上,有单独的类来处理查询消息的请求,跟msgId的位置一样,在类QueryMessageProcessor的processRequest中,根据msgKey的查询逻辑会走RequestCode.QUERY_MESSAGE,代码如下:
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.QUERY_MESSAGE://按照Message Key查询消息return this.queryMessage(ctx, request);case RequestCode.VIEW_MESSAGE_BY_ID://按照MessageId查询消息return this.viewMessageById(ctx, request);default:break;}return null;}
public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException//省略了其它非讲解代码final QueryMessageResult queryMessageResult =this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),requestHeader.getEndTimestamp());//省略了其它非讲解d代码}
在DefaultMessageStore.queryMessage中对消息进行查询,代码如下:
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {//找出所有符合条件的phyOffset,只找maxNum就返回QueryOffsetResult queryOffsetResult =this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);if (queryOffsetResult.getPhyOffsets().isEmpty()) {break;}//省略其它代码//循环集合取数据for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {//...if (match) {//根据phyOffset取数据,并把结果封装SelectMapedBufferResult result = this.commitLog.getData(offset, false);if (result != null) {int size = result.getByteBuffer().getInt(0);result.getByteBuffer().limit(size);result.setSize(size);queryMessageResult.addMessage(result);}}}}




