暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMq源码系列十-长轮询与FileRegion

易林的博客 2019-10-23
606

在上一篇文章中我们分析了rocketMq的重试消息和消息过滤,这篇我们主要分析长轮询和fileRegion优化

在说长轮询时,我们不防想个问题,假设一个服务端,一个客户端,客户端向服务端发起请求,去拉取数据,服务端响应数据,正常过程,都运行的挺好的,但是服务端假设服务端数据暂时已经拉取完了,如果你还去拉,这样就会产生大量的空的请求和响应,造成网络风暴,影响整个服务端,rocketMq在消息拉取时它是怎么做的呢,当消息没有时,它会hold住整个请求,如果有消息来了,或者到达了超时时间才回把这个响应告诉客户端

1.long polling

既然我们说到消息拉取,我们从消息拉取的地方开始分析

1.1.消息拉取

入口:DefaultMQPushConsumerImpl..pullMessage

 //省略了大量代码,这个不是本次的重点,如果需要大家请自行看之前的消息拉取的相关分析

//消息拉取的回调
PullCallback pullCallback = new PullCallback() {
//成功
@Override
public void onSuccess(PullResult pullResult) {

}

//异常了
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
//关键的地方,我们看到了,如果是异常了,我们会等一会在拉取请求
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
try {
//消息拉取
//BROKER_SUSPEND_MAX_TIME_MILLIS suspend超时时间:15
//ASYNC :异步拉取
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);

为了方便分析,我们省略了大量代码,关于消息发送,大家可以自己看之前写的消息发送篇,这里我们看到了一个点,消息回调,如果发生异常,客户端,会等一会在拉取请求,那我们就的看看,这个是什么时候才调用的呢,也就是说什么时候发生异常呢

我们继续深入分析,消息拉取

MQClientAPIImpl.pullMessageAsync

  private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
)
throws RemotingException, InterruptedException
{
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}


上面就是当我们消息拉取完成后,消息的回调,我们发现如果返回的reponse为空,则会调用异常逻辑,那接下来我们就只需要验证,当borker没有取到消息时,返回的reponse是不是空

上面是消息拉取的开始,比较简单,我们继续跟下去,看broker端的处理

1.2.broker端拉取消息的处理

入口:PullMessageProcessor.processRequest(ctx, r)

 public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException
{
return this.processRequest(ctx.channel(), request, true);
}

这里表明我们收到了consumer端拉取消息的请求了,注意最后一个变量是true(suspend),我们继续往下看

 private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);


//省略了无关代码

final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);


switch (response.getCode()) {
case ResponseCode.SUCCESS:
//省略了无关代码

break;
//消息没有找到
case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
//topic
String topic = requestHeader.getTopic();
//偏移量
long offset = requestHeader.getQueueOffset();
//队列id
int queueId = requestHeader.getQueueId();
//重新构建消息拉取请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//把请求放入pullRequestTable
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}

}


return response;
}


果然,当我们消息没有取到时,返回的是空,也就验证了我们的猜想,当数据没有时,consumer会延迟15秒在拉取消息

现在不知道大家有没有一个疑问,对于这类请求,rocketMq它到底是怎么样取处理这类请求的呢

1.3.rocketMq 对于 new pullReuest的处理

我们在看下新的pullRequest的创建和保持

     //request:新的pull reqeust
//channel:通道
//timeoutMillis 超时时间
//suspendTimestamp:请求hold住的时间
//pullFromThisOffset :消息拉取的偏移量
//subscriptionData:订阅关系
//messageFilter:过滤器
public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
long pullFromThisOffset, SubscriptionData subscriptionData,
MessageFilter messageFilter)


上面就是pull request的创建流程了,大家跟着注释看,

PullRequestHoldService

    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
//key topic+queueid
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
//把请求放入map集合中
mpr.addPullRequest(pullRequest);
}

    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);

上面就是rocketMq对于消息没有拉到的处理,首我们可以看见,数据是放在一个map集合中的,集合中每个key存储的是相同队列,相同topic的全部请求

到这里我们就分析完了,rocketMq对于newPullRequest的处理,接下来相当于我们把请求都准备好了,那么这个请求什么时候释放呢。下面我们接着分析

1.4.rocketMq 释放newPullRequest

这个地方我们要从,当有新的消息来时,我们知道,会存入commitLog,此时可能还在pacheCache,这个时候我们会去构建consumeQueue和index

入口:ReputMessageService.doReput

  private void doReput() {

if (result != null) {
try {

if (dispatchRequest.isSuccess()) {
if (size > 0) {
//构建consumeQueueindex
DefaultMessageStore.this.doDispatch(dispatchRequest);

if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&&
DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {

//这个地方会有个监听 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}


上面我们省略了大量无关的代码,我们需要关注的是当consumeQueue构建完后,会有个监听器,这个监听器其实就是会通知我们新消息来了,我们可以释放我们hold住的request了

接下来我们进入里面去分析,看到底是怎么通知的

NotifyMessageArrivingListener.arriving

  //topic:消息的topic
//queueId:队列id
//logicOffset:最大的偏移量
//tagsCode tag
//msgStoreTime:消息存储的时间
//filterBitMap :过滤
//properties:一些属性值
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
}

上面代码比较简单,就不分析了,我们继续进入

 public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
//构建key,和我们存储pullRequest时一样
String key = this.buildKey(topic, queueId);

//从这个key中取出所有的pullRequest
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
//复制并清空
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
//循环遍历pullRequest
for (PullRequest request : requestList) {

long newestOffset = maxOffset;
//比较最大的偏移量和request的最大偏移量
if (newestOffset <= request.getPullFromThisOffset()) {
//重新获取最大偏移量
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
//如果最大偏移量是大于request的偏移量的,说明是有新的消息被发送了,我们可以去消费了
if (newestOffset > request.getPullFromThisOffset()) {
//做一次过滤,比如说taghashCode
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}

if (match) {
try {
//唤醒被等待的request,这个地方我们下面单独分析 this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
//如果当前时间-请求hold住的时间,还要大于请求的超时时间,说明这个请求等待的太久了,请求也该被释放{用来处理一些一直因为某些原因被hold的请求}
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}

replayList.add(request);
}

if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}

这个地方我们简单总结下,当有消息来的时候,我们会根据消息的topic和queueid,作为key取出所有的等待的pullRequest,然后释放掉我们的pullRequest,怎么释放的我们接下来会分析

不知道大家看到这里的时候有没有一个疑问,因为是多线程程序,比如下面的情况,

arringlister发生在消息存储之前,那么这个pull request是不是就通知不到了呢,其实不是,rocketMq已经想到了这个问题,它的触发地方除了消息来了,还有一个地方

PullRequestHoldService.run

 @Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}

long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}

log.info("{} service end", this.getServiceName());
}


上面我们需要关注一个方法checkHoldRequest,也就是说rocketMq还有一个线程,它会每隔一段时间,去进行扫描,看有没有推及的pullRequest,接下来我们看下checkHoldRequest

    private void checkHoldRequest() {
//遍历了里面全部的元素
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
//获取consumeQueue的最大偏移量
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
//再次通知
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}

上面就保证了,不会有请求一直hold住

  public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}

我们需要注意的是,后面的几个参数基本上是空,接下来关于notifyMessageArriving,我们就不重复贴了,因为传入的参数为空,所有那个match返回的是个false,会走下面关于时间的判断,看请求hold住的时间是否超时了,如果超时则执行唤醒

接下来我们继续分析,它到底是怎么唤醒的呢?

PullMessageProcessor.executeRequestWhenWakeup

  //channel :通道
//request:请求
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request)
throws RemotingCommandException
{
Runnable run = new Runnable() {
@Override
public void run() {
try {
//发起请求获取消息 ,需要关注最后一个遍历是false,我们之前第一次拉取消息时这个地方是true
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
//如果相应不为空
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
//把响应写回,下面这个回调我们应该比较清晰,我们上面就分析了
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}

看到这,我们基本上就清楚了,它是怎么释放这个请求的,再次去请求拉取新的消息,如果返回不为空,则通过回调告诉consume消息我已经拉取到了

我们接下来在看拉取消息

  private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException
{
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

response.setOpaque(request.getOpaque());



switch (response.getCode()) {

case ResponseCode.SUCCESS:

this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());

this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());

this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
//配置了isTransferMsgByHeap 会走普通的数据复制的方式把数据传输给consume,
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
//这个地方在获取响应体时,如果大家进去看,会看见它重新申请了缓存空间,进行了消息的复制
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
try {
//否则,是借助fileRegion走零拷贝技术,底层是用的c语言的sendFile
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}

response = null;
}
break;
case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}

String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}


return response;
}

上面代码就不分析了,大家继续看之前的消息发送篇,我们这里面需要清除的是,hasSuspendFlag=false此时,那说明第二次如果还是失败的,是不会重新构建pullRequest,那么返回的reponse也就是不是空,也就会走sucess的逻辑,消息成功拉取则通过,消息成功拉取根据是否配置isTransferMsgByHeap,如果配置了(默认情况),则会走数据复制,数据会复制到heap空间,然后在复制到用户的socket空间,而走下面的那种方式,会借助FileRegion实现0拷贝

到底是怎么实现0拷贝的呢,借助于netty中Pipeline的 NioSocketChannel,最终会HeadContest的write和flush完成整个nio的操作,对于,netty不是特别熟,就先分析到这了

最后借助下费红健老师总结一张图

大家如果看完整个上面的分析流程,这个图基本上就没问题了

2.总结

到的这里我们就分析完了整个长轮询过程,关于fileGion,这块内容,只是简单提了下,后面有时间了,在补充起来,最后谢谢大家

备注

参考了费红健老师博客,附上相关博客地址https://www.jianshu.com/u/81e2cd2747f1


文章转载自易林的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论