在这样的场景下,NameNode 必然会面临高并发的访问,与此同时,为了保证元数据的安全可靠,不能仅仅保存在内存中,而是需要刷写到磁盘。那么,NameNode 如何在高并发的场景下既保证高性能地处理各种元数据操作,又可以将元数据持久化到磁盘来保证安全?
这里 NameNode 巧妙地使用了 双缓冲 机制。示意图如下:

在分析 双缓冲 机制前,首先了解下 NameNode 是如何对元数据进行持久化的。集群启动时,NN 会将磁盘存储的元数据信息(FsImage 文件)加载到内存,之后所有对元数据的操作都会记录其操作日志,并将这些日志持久化到 EditLog 文件中。后续就可以在 FsImage 的基础上执行 EditLog 中的操作来恢复集群元数据。
如果每次对元数据的操作都进行同步刷盘,性能会很低。这里 NameNode 会先把操作日志写入写缓冲(上图中的Current-Buffer),由于是纯内存操作,速度很快,当日志量达到一定阈值时,进行批量刷盘。为了解决刷盘过程中无法继续执行写操作的问题,NameNode 采用了双缓冲机制,也就是刷盘之前将 Sync-Buffer 和 Current-Buffer 进行交换,Current-Buffer 继续执行写操作,Sync-Buffer 进行刷盘,互不影响,刷盘完成后清空 Sync-Buffer,等待下次交换。
同时,为了提高并发处理能力,采用了分段加锁的方式,对于刷盘这种耗时的操作是不加锁的,其余操作由于是纯内存操作,即使加了锁速度也很快。这样既保证了高性能地处理并发访问,又可以将元数据持久化到磁盘来保证数据安全性。
NameNode 写入缓冲并进行缓冲交换和刷盘的具体逻辑在 FSEditLog.logSync() 方法中,这里采用伪代码对其原理进行说明:
1. 将操作日志封装成 EditLog 类。该类有两个基本属性:
txid 表示操作的事务id
context 表示操作日志的内容
/*** 定义日志类型,每条操作日志就是一个 EditLog 对象*/class EditLog{private long txid ;private String context ;public EditLog(long txid, String context) {this.txid = txid;this.context = context;}//get 和 set 方法@Overridepublic String toString() {return "EditLog{" +"txid=" + txid +", context='" + context + '\'' +'}';}}
class DoubleBuffer{//执行日志写入操作的缓冲private LinkedList<EditLog> currentBufffer = new LinkedList<EditLog>();//执行日志同步刷盘的缓冲private LinkedList<EditLog> syncBufffer = new LinkedList<EditLog>();//get 和 set 方法/*** 向currentBuffer缓冲中写入操作日志* @param log*/public void write(EditLog log){currentBufffer.add(log);}/*** 返回同步缓冲中的最大txid* @return*/public long getLastTxid(){return syncBufffer.getLast().getTxid();}/*** 交换缓冲*/public void setReadyToFlush(){LinkedList<EditLog> tmp = currentBufffer;currentBufffer=syncBufffer;syncBufffer=tmp;}/*** 将同步缓冲中的数据刷盘,这里象征性地打印一下*/public void flush(){for (EditLog editLog : syncBufffer) {System.out.println("存入操作日志:"+editLog);}//清空缓冲syncBufffer.clear();}}
logEdit:该方法对外部开发,用于写入操作日志 logSync:在 logEdit 方法中会调用该方法,用于缓冲交换及日志刷盘
public class DoubleBufferDemo {private long txid = 0L;//双缓冲操作对象private DoubleBuffer buffer = new DoubleBuffer();//标记当前是否正在往磁盘里面刷写数据private volatile Boolean isSyncRunning = false;//标记是否有线程正在等待同步private volatile Boolean isWaitSync = false;//初始化刷写的最大事务idprivate volatile Long syncMaxTxid = 0L;//存储每个线程内部的事务idprivate ThreadLocal<Long> localTxid=new ThreadLocal<Long>();.../*** 写入操作日志* @param content*/public void logEdit(String context){//加锁synchronized (this){//线程1:txid=1//线程2:txid=2//线程3:txid=3txid++;//线程1:localTxid=1//线程2:localTxid=2//线程3:localTxid=3localTxid.set(txid);EditLog editLog = new EditLog(txid, content);//将操作日志写入缓冲buffer.write(editLog);}//释放锁logSync();}/*** 将缓冲中的数据刷盘*/public void logSync(){synchronized (this){//当第一个线程执行到这里,isSyncRunning 为false,不执行分支内的逻辑if(isSyncRunning){//线程二:2//线程三:3long txid = localTxid.get();//当线程2和线程3进来时,由于txid<=syncMaxTxid(也就是3),说明线程1进行刷盘时,sync缓冲中//已经包含了线程2和线程3的操作日志,所以不需要线程2和线程3再进行刷盘,方法直接返回//当线程4进来时,该条件不满足,接着往下执行if(txid <= syncMaxTxid){return;}//线程4执行到这里,isWaitSync=false,继续向下执行//线程5执行到这里,isWaitSync=true,方法返回if(isWaitSync){//线程5直接返回return;}//线程4将isWaitSync赋值为trueisWaitSync = true;//线程4执行到这里,isSyncRunning=true,执行里面的逻辑while(isSyncRunning){try {//线程4就在这儿等:退出等待有两种情况//1)等待时间到了 2)被人唤醒了wait(2000);//wait操作会释放锁}catch (Exception e){e.printStackTrace();}}//线程4结束等待,更新isWaitSync=falseisWaitSync = false;}//进行内存交换//当第一个线程执行缓冲交换时,假设 currentBuffer //中已经完成了线程1,线程2//中已经完成了线程1,线程2线程3 的操作日志写入buffer.setReadyToFlush();//如果同步缓冲中存在操作日志,获取其最大的事务id//这里由于假设线程1-3已经完成了日志写入,所以第一次获取到的最大事务id就是3if(buffer.getSyncBufffer().size() > 0) {syncMaxTxid = buffer.getLastTxid();}//更新标识,表明正在往磁盘中同步数据isSyncRunning = true;} //线程1 释放锁/*** 分段加锁* 线程1 刷写数据* 刷盘相对较慢*///这里耗时的刷盘操作并未加锁buffer.flush();//重新加锁synchronized (this) {//线程1刷写磁盘完成,将isSyncRunning 更新为falseisSyncRunning = false;//唤醒等待线程。notifyAll();}}}

线程1 调用 logEdit 方法,获取锁并将操作日志写入currentBuffer,之后释放锁并调用 logSync 方法
线程1 释放锁后,线程2,线程3依次获取锁,将操作日志写入 currentBuffer
线程1 执行 logSync 方法,首先会获取锁,由于 isSyncRunning 初始值为 false,if 条件不满足,直接跳过该分支执行缓冲交换(注意此时交换后的 syncBuffer 中包含线程2 和线程3 的操作日志),获取 syncBuffer 中的最大事务syncMaxTxid=3,然后将 isSyncRunning 标识更新为 true ,表示有线程在进行刷盘操作
线程1 释放锁并执行刷盘操作,该操作耗时相对较长
线程2 执行 logSync 方法,此时 isSyncRunning=true,进入if 分支,由于线程2的事务id=2,小于等于 syncMaxTxid,线程2直接返回
线程3 执行 logSync 方法,和线程2 同理,线程3直接返回

由于线程1 的刷盘操作耗时较长,此时线程4,线程5,线程6依次调用 logEdit 方法将操作日志写入 currentBuffer(线程4 写入前已经进行了缓冲交换)
线程4 执行 logSync 方法,获取锁后进入 if 分支,由于 isWaitSync=false,线程4 不会进入 if(isWaitSync) 分支,之后将 isWaitSync 更新为 true,由于此时 isSyncRunning=true,线程4 调用 wait 方法进行等待并释放锁
线程5 调用 logSync 方法,获取锁后进入 if 分支,由于 isWaitSync=true,线程5 进入if(isWaitSync) 分支后直接返回
线程6 调用 logSync 方法,获取锁后进入if 分支,和线程5 同理直接返回
线程4 由于调用了 wait 方法,只能等待时间到达或者被其它线程唤醒
假设此时线程1 执行刷盘操作完毕,再次获取锁后将标记 isSyncRunning 置为 false,表示刷盘操作已完成,并调用 notifyAll 方法唤醒等待的线程
线程4 被线程1 唤醒,将标识 isWaitSync 置为 false,表示已经没有线程等待刷盘了,之后进行缓冲交换,继续执行上面的流程(此时线程7,线程8等已经将操作日志写入 currentBuffer)
NameNode 通过双缓冲的机制,将写内存和刷写磁盘的操作进行分离,互不影响。 NameNode 通过分段加锁来减小锁的粒度,即使在高并发场景下,加锁操作都是纯内存操作,性能很高。而对于相对耗时的刷盘操作,由于该操作未加锁,某个线程刷盘时并不影响其它线程将操作日志写入写缓冲。




