暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

rocketMq同步刷盘和异步刷盘

易林的博客 2019-10-16
573


在上一篇文章中我们分析完了rocketMq的映射文件的创建和获取以及文件预热相关过程,本篇我们来分析刷盘相关的东西,rocketMq提供了2种刷盘策略,同步刷盘和异步刷盘,首先我们来简单看看rocketMq刷盘的差异性

主要就是同步刷盘需要等到真正的刷盘到磁盘后,然后返回给producer才完成整个消息发送过程,而异步刷盘只需要把消息发送的pacheCache中,然后由异步线程自己完成刷盘

1.同步刷盘

1.1.同步刷盘流程分析

同步刷盘整个过程是消息写入到缓存pageCache后,groupCommitRequest会被唤醒,最终由GroupCommitService完成整个刷盘,为什么这么设计呢,其实在同步刷盘时,如果我们配置不立即刷盘,每次刷盘时会把刷盘请求放入List<GroupCommitRequest中>。然后GroupCommitService线程被唤醒,批量消息刷盘(其实就是从同步退化到异步了,可以简单这么理解)

1.2.源码分析

入口 CommitLog.handleDiskFlush

   public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//需要等到刷盘成功
if (messageExt.isWaitStoreMsgOK()) {

//构造刷盘请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待刷盘
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
//刷盘超时
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
//相当于是异步刷盘,唤醒相关线程
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//异步刷盘之-开启堆外内存方式
flushCommitLogService.wakeup();
} else {
//异步刷盘之 pacheCache方式
commitLogService.wakeup();
}
}
}

接下来我们分析同步刷盘处理线程,groupCommitService

1.3.GroupCommitService 执行刷盘

       private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
//之所以是循环2次是因为有些messsage可能是在下一个映射文件中,刷盘2次,保证消息都落盘了
for (int i = 0; i < 2 && !flushOK; i++) {

//如果刷盘的位置已经大于了或等于当前请求的偏移,说明,该message已经刷盘
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

//没有刷盘,则执行刷盘
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
//唤醒前面等待的request线程
req.wakeupCustomer(flushOK);
}

//更新下刷盘的最新时间
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
//清除当前请求
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
//个别mesage设置为不同步的刷盘策略,将会走这条路劲
CommitLog.this.mappedFileQueue.flush(0);
}
}
}

public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

上面就是整个刷盘的流程,接下来我们继续分析整个刷盘的过程

  //flushLeastPages  0
public boolean flush(final int flushLeastPages) {
boolean result = true;
//根据要刷盘的位置,获取对应的mappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//刷盘,并返回刷盘的偏移
int offset = mappedFile.flush(flushLeastPages);
//记录新的刷盘位置
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}

return result;
}




public int flush(final int flushLeastPages) {
//做一些条件判断,看是否刷盘,看下面的分析
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();

try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
//开启了堆外内存的方式的刷盘
this.fileChannel.force(false);
} else {
//执行刷盘 ,没有开启堆外内存的方式
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}

this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}


private boolean isAbleToFlush(final int flushLeastPages) {
//获取当当前的刷盘位置
int flush = this.flushedPosition.get();
//获取到写入文件的位置
int write = getReadPosition();

//如果文件已经写满了,执行刷盘
if (this.isFull()) {
return true;
}
//如果 flushLeastPages 延迟刷盘
if (flushLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
//写入的message大于已经刷盘的message,执行刷盘
return write > flush;
}


大家跟着注释读,应该比较清楚,这样我们就分析完了整个同步刷盘的过程

2.异步刷盘

2.1.异步刷盘流程

异步刷盘和同步刷盘不同的是,异步刷盘数据写入到缓存后,就完成了,异步刷盘根据是否开启堆外内存有两种方式:

  1. 一种是基于pacheCache+FlshTimeRealService完成刷盘过程

  2. 另一种是基于TransientStorePoll+CommitRealService-》第一次刷盘到(commit)pacheCache(channel),唤醒FlshTimeRealService,再次执行刷盘,有两次刷盘的动作

2.2.源码分析

入口 CommitLog.handleDiskFlush

   public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//省略同步刷盘流程
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//异步刷盘之-开启堆外内存方式
flushCommitLogService.wakeup();
} else {
//异步刷盘之 pacheCache方式
commitLogService.wakeup();
}
}
}

2.3.堆外内存刷盘 CommitRealTimeService

     @Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {

//commit的间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

//每次commit的最小页面
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

//即使没有达到commit页面的要求,超过一定时间也进行刷盘
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();
//就算刷盘的消息的数量达不到,但是时间达到了也进行刷盘
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}

try {
//进行刷盘(其实就是写入pacheCache,还没有真正的刷盘)
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
//如果message写入cache成功,真正执行刷盘
flushCommitLogService.wakeup();
}

if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}

boolean result = false;
//重试刷盘
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}

上面是推外内存的刷盘过程,接下来我们继续分析mappedFileQueue.commit
    public boolean commit(final int commitLeastPages) {
boolean result = true;
//根据提交位置找到对应的映射文件
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
//数据写入cache
int offset = mappedFile.commit(commitLeastPages);
//记录新的提交位置
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;
this.committedWhere = where;
}

return result;
}

public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}

//一些刷盘条件的判断,我们上面已经分析过了
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
//数据commit
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}

// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}

return this.committedPosition.get();
}


protected void commit0(final int commitLeastPages) {
//得到写入位置
int writePos = this.wrotePosition.get();
//上一次的写入位置
int lastCommittedPosition = this.committedPosition.get();

if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
//从上一个位置开始写
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
//写入到pacheCache
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}


到这里我们已经分析完了整个堆外内存的刷盘过程,还有一部唤醒FlshTimeRealService执行真正的刷盘,我们在接下来进行分析

2.4.pacheCache刷盘(非堆外内存)刷盘 FlushRealTimeService

 public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
//定时刷盘的开关
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//定时刷盘的间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();

//每次刷盘的最少页面
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

//即使没有达到的刷盘的页面要求,超出一定时间,也要刷盘
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

boolean printFlushProgress = false;

// Print flush progress 打印整个刷盘流程
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}

try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}

if (printFlushProgress) {
this.printFlushProgress();
}

long begin = System.currentTimeMillis();

//刷盘(和上面同步刷盘的过程是一样了,就不再分析了)
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//更新最新的刷盘时间
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}

// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}

this.printFlushProgress();

CommitLog.log.info(this.getServiceName() + " service end");
}

到这里我们就分析完了,整个异步刷盘的流程,下面我们把异步刷盘的流程进行总结下

2.5.总结

构建映射文件mappedFile有两种方式,一种是基于pacheCache(mappedByteBuffer)一种是基于堆外内存池(writeBuffer),mappedByteBuffer可以直接刷盘到disk,而writeBuffer需要经过commit后到fileChannel(pacheCache)然后执行flush到disk

3.TransientStorePool的总结

来自社区胡宗棠老师的总结

一般有两种,有两种方式进行读写

(1)第一种,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。

(2)第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

备注:参考了社区陈厚道老师官方社区的分享

文章转载自易林的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论