一、相关概述
1、hbase存储结构
当我们put一条数据到hbase的时候,Client访问zookeeper,从hbase:meta表中获取表位于那个HRegionServer,通过刚刚获取的地址访问对应的regionserver,拿到对应的表存储的regionserver,访问表所在regionServer插入数据,首先向wal写入,然后向memstore中写入数据,当memstore写入的值变多,触发溢写操作(flush),进行文件的溢写,成为一个StoreFile(HFile)当溢写的文件过多时,会触发文件的合并(Compact)操作,合并有两种方式(major,minor)
2、各个架构组件之间的关系
HRegionServer一般和DataNode在同一台机器上运行,实现数据的本地性。一个HRegionServer由若干个Region组成Region是一个Table中的一个Region在一个HRegionServer中的表达,Region由多个Store组成,一个列族又对应一个store,Store由一个MemStore 和0个或多个StoreFile(Hfile)组成

img
二、Memstore
单个Region内部的结构

img
一个Region包含有多个Store:每一个Region内都包含有多个Store实例。一个Store对应一个列族的数据,如果一个表有两个列族,那么在一个Region里面就有两个Store。Memstore的实现目的不是加速数据写入,而是维持数据结构。由于HDFS文件不支持修改,为了维持HBase中的数据是按rowkey顺序来存储的,所以使用Memstore先对数据进行整理排序后再持久化到HDFS上。
2.MemStore Flush的触发时机
Region 中所有 MemStore 占用的内存超过相关阈值
每当client调用put,delete等操作,都会检查hbase.hregion.memstore.flush.size 这个参数,默认128M,
当region中的所有的MemStore 占用 的内存(包括堆内存和堆外内存)的大小超过128M的阈值的时候都会
触发一次刷写;当数据操作速度很快的时,达到了
hbase.hregion.memstore.flush.size * hbase.hregion.memstore.block.multiplier(默认值4),
也就是512M,会在刷写的同时阻塞所有写入该store的写请求,这时往这个Store写数据将会出现 RegionTooBusyException 。
整个 RegionServer 的 MemStore 占用内存总和大于相关阈值
hbase中RegionServer写缓存的大小,占RegionServer 整个 JVM 内存使用量的 40%:RegionServer 占用的堆内存大小为:
(hbase_heapsize)* hbase.regionserver.global.memstore.size(默认值是 0.4)当整个RegionServer的MemStore占用内存总大小大于写缓存的0.95倍时,触发RegionServer级别的Flush
hbase.regionserver.global.memstore.size.lower.limit(默认值0.95)
* hbase.regionserver.global.memstore.size(默认值0.4)
* hbase_heapsize(HRegionServer总内存大小)
类似于单个memstore大小达到某个阀值会阻塞写入,全局的memstore的大小达globalMemStoreSize也会阻塞写入。举个例子,比如你配置的
hbase.regionserver.global.memstore.size.lower.limit是0.95,
hbase.regionserver.global.memstore.size是0.4
堆内存总共是16G,那么触发刷写的阈值是16*0.4*0.95=6.08G触发阻塞的阈值是:0.99.0 版本之后HBase直接用
hbase.regionserver.global.memstore.size(0.4)*hbase_heapsize
(HRegionServer总内存大小)来控制阻塞阈值了16*0.4=6.4G
所以,当memstore的大小达到6.08GB的时候会强制刷写,当memstore的大小达到6.4GB的时候就会阻塞整个HBase集群的写入。如果达到了 RegionServer 级别的 Flush,那么当前 RegionServer 的所有写操作将会被阻塞,而且这个阻塞可能会持续到分钟级别。
WAL的数量大于maxLogs
WAL(Write-ahead log,预写日志)用来解决宕机之后的操作恢复问题的。数据到达 Region 的时候是先写入 WAL,然后再被写到 Memstore 的。如果 WAL 的数量越来越大,这就意味着 MemStore 中未持久化到磁盘的数据越来越多。当 RS 挂掉的时候,恢复时间将会变成,所以有必要在 WAL 到达一定的数量时进行一次刷写操作。这个阈值(maxLogs)的计算公式如下:
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
this.logrollsize = (long)(this.blocksize * multiplier);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir)
throws IOException {
return conf.getLong("hbase.regionserver.hlog.blocksize",
CommonFSUtils.getDefaultBlockSize(fs, dir) * 2);
}
private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
}当某个regionserver上的所有WAL文件数达到hbase.regionserver.max.logs(默认是32)时,该regionserver上的memstores会发生一次flush,以减少wal文件的数目,此时flush的目的是控制wal文件的个数,以保证regionserver的宕机恢复时间可控。也就是说,如果设置了 hbase.regionserver.maxlogs,那就是这个参数的值;否则是 max(32, hbase_heapsize * hbase.regionserver.global.memstore.size * 2 / logRollSize)。如果某个 RegionServer 的 WAL 数量大于 maxLogs 就会触发 MemStore 的刷写。当WAL文件的数量大于这个值后会触发memstore的刷写,以便创造新的memstore内存空间用来加载WAL中的数据,同时HBase会给出一个
info级别的日志:LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
"; forcing flush of " + regions.length + " regions(s): " + sb.toString());FsHlog.java中的源码
/**
* If the number of un-archived WAL files is greater than maximum allowed, check the first
* (oldest) WAL file, and returns those regions which should be flushed so that it can
* be archived.
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
* @throws IOException
*/
//如果未存档的WAL文件的数量大于允许的最大值,检查第一个(最旧的)WAL文件,并返回应刷新的区域,以便可以将其存档。
byte[][] findRegionsToForceFlush() throws IOException {
byte [][] regions = null;
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
this.byWalRegionSequenceIds.firstEntry();
regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
}
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
"; forcing flush of " + regions.length + " regions(s): " + sb.toString());
}
return regions;
}Memstore达到刷写时间间隔
如果我们很久没有对 HBase 的数据进行更新,这时候就可以依赖定期刷写策略了。RegionServer 在启动的时候会启动一个线程 PeriodicMemStoreFlusher 每隔 hbase.server.thread.wakefrequency 时间去检查属于这个 RegionServer 的 Region 有没有超过一定时间都没有刷写,这个时间是由 hbase.regionserver.optionalcacheflushinterval 参数控制的,默认是 3600000,也就是1小时会进行一次刷写。如果设定为0,则意味着关闭定时自动刷写。也就是说如果以上的所有条件都没有被触发到的话,memstore还是
会每隔一个小时刷写一次,并生成一个HFile。那么HFile岂不是会很
多?所以HBase有了一个必不可缺的功能,那就是HFile(StoreFile)
的合并(Compaction)
为了防止一次性有过多的 MemStore 刷写,定期自动刷写会有 0 ~ 5 分钟的延迟,PeriodicMemStoreFlusher 类的实现。static class PeriodicMemstoreFlusher extends ScheduledChore {
final HRegionServer server;
final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
final static int MIN_DELAY_TIME = 0; // millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
this.server = server;
}数据更新超过一定阈值
如果 HBase 的某个 Region 更新的很频繁,而且既没有达到自动刷写阀值,也没有达到内存的使用限制,但是内存中的更新数量已经足够多,比如超过 hbase.regionserver.flush.per.changes 参数配置,默认为30000000,那么也是会触发刷写的。
手动触发刷写
除了 HBase 内部一些条件触发的刷写之外,我们还可以通过执行相关命令或 API 来触发 MemStore 的刷写操作。比如调用可以调用 Admin 接口提供的方法:
void flush(TableName tableName) throws IOException;
void flushRegion(byte[] regionName) throws IOException;
void flushRegionServer(ServerName serverName) throws IOException;分别对某张表、某个 Region 或者某个 RegionServer 进行刷写操作。也可以在 Shell 中通过执行 flush 命令:
hbase> flush 'TABLENAME'
hbase> flush 'REGIONNAME'
hbase> flush 'ENCODED_REGIONNAME'
hbase> flush 'REGION_SERVER_NAME'
需要注意的是,以上所有条件触发的刷写操作最后都会检查对应的 HStore 包含的 StoreFiles 文件超过 hbase.hstore.blockingStoreFiles 参数配置的个数,默认值是16。如果满足这个条件,那么当前刷写会被推迟到 hbase.hstore.blockingWaitTime 参数设置的时间后再刷写。在阻塞刷写的同时HBase 还会请求 Split 或 Compaction 操作。
3、触发 MemStore 刷写的操作
常见的 put、delete、append、increment、调用 flush 命令、Region 分裂、Region Merge、bulkLoad HFiles 以及给表做快照操作都会对 上面的相关条件做检查,以便判断要不要做刷写操作。
三、MemStore 刷写策略(FlushPolicy)
在 HBase 1.1 之前,MemStore 刷写是 Region 级别的。就是说,如果要刷写某个 MemStore ,MemStore 所在的 Region 中其他 MemStore 也是会被一起刷写的!这会造成一定的问题,比如小文件问题。针对这个问题,HBASE-10201/HBASE-3149引入列族级别的刷写。我们可以通过 hbase.regionserver.flush.policy 参数选择不同的刷写策略。
目前 HBase 2.0.2 的刷写策略全部都是实现 FlushPolicy 抽象类的。并且自带三种刷写策略:FlushAllLargeStoresPolicy,FlushNonSloppyStoresFirstPolicy和 FlushAllStoresPolicy
FlushAllStoresPolicy
这种刷写策略实现最简单,直接返回当前 Region 对应的所有 MemStore。也就是每次刷写都是对 Region 里面所有的 MemStore 进行的,这个行为和 HBase 1.1 之前是一样的。
FlushAllLargeStoresPolicy
在 HBase 2.0 之前版本是 FlushLargeStoresPolicy,后面被拆分成分 FlushAllLargeStoresPolicy 和FlushNonSloppyStoresFirstPolicy,参见 HBASE-14920。这种策略会先判断 Region 中每个 MemStore 的使用内存(OnHeap + OffHeap)是否大于某个阀值,大于这个阀值的 MemStore 将会被刷写。阀值的计算是由
//region.getMemStoreFlushSize() / familyNumber
//就是 hbase.hregion.memstore.flush.size 参数的值除以相关表列族的个数
flushSizeLowerBound = max(region.getMemStoreFlushSize() / familyNumber, hbase.hregion.percolumnfamilyflush.size.lower.bound.min)
//如果设置了 hbase.hregion.percolumnfamilyflush.size.lower.bound
flushSizeLowerBound = hbase.hregion.percolumnfamilyflush.size.lower.bound
计算逻辑上面已经很清晰的描述了。hbase.hregion.percolumnfamilyflush.size.lower.bound.min 默认值为 16MB,而 hbase.hregion.percolumnfamilyflush.size.lower.bound 没有设置。
比如当前表有3个列族,其他用默认的值,那么 flushSizeLowerBound = max((long)128 / 3, 16) = 42。如果当前 Region 中没有 MemStore 的使用内存大于上面的阀值,FlushAllLargeStoresPolicy 策略就退化成 FlushAllStoresPolicy 策略了,也就是会对 Region 里面所有的 MemStore 进行 Flush。
FlushNonSloppyStoresFirstPolicy
HBase 2.0 引入了 in-memory compaction,参见 HBASE-13408。如果我们对相关列族hbase.hregion.compacting.memstore.type 参数的值不是 NONE,那么这个 MemStore 的 isSloppyMemStore 值就是 true,否则就是 false。FlushNonSloppyStoresFirstPolicy 策略将 Region 中的 MemStore 按照 isSloppyMemStore 分到两个 HashSet 里面(sloppyStores 和 regularStores)。然后判断 regularStores 里面是否有 MemStore 内存占用大于相关阀值的 MemStore ,有的话就会对这些 MemStore 进行刷写,其他的不做处理,这个阀值计算和 FlushAllLargeStoresPolicy 的阀值计算逻辑一致。如果 regularStores 里面没有 MemStore 内存占用大于相关阀值的 MemStore,这时候就开始在 sloppyStores 里面寻找是否有 MemStore 内存占用大于相关阀值的 MemStore,有的话就会对这些 MemStore 进行刷写,其他的不做处理。如果上面 sloppyStores 和 regularStores 都没有满足条件的 MemStore 需要刷写,这时候就FlushNonSloppyStoresFirstPolicy 策略久退化成 FlushAllStoresPolicy 策略了。
刷写的过程
MemStore 的刷写过程很复杂,很多操作都可能触发,但是这些条件触发的刷写最终都是调用 HRegion 类中的 internalFlushcache 方法。
protected FlushResultImpl internalFlushcache(WAL wal, long myseqid,
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
PrepareFlushResult result =
internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker, tracker);
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
return result.result; // early exit due to failure from prepare stage
}
}
从上面的实现可以看出,Flush 操作主要分以下几步做的
prepareFlush 阶段:刷写的第一步是对 MemStore 做 snapshot,为了防止刷写过程中更新的数据同时在 snapshot 和 MemStore 中而造成后续处理的困难,所以在刷写期间需要持有 updateLock 。持有了 updateLock 之后,这将阻塞客户端的写操作。所以只在创建 snapshot 期间持有 updateLock,而且 snapshot 的创建非常快,所以此锁期间对客户的影响一般非常小。对 MemStore 做 snapshot 是 internalPrepareFlushCache 里面进行的。
flushCache 阶段:如果创建快照没问题,那么返回的 result.result 将为 null。这时候我们就可以进行下一步 internalFlushCacheAndCommit。其实 internalFlushCacheAndCommit 里面包含两个步骤:flushCache 和 commit 阶段。flushCache 阶段其实就是将 prepareFlush 阶段创建好的快照写到临时文件里面,临时文件是存放在对应 Region 文件夹下面的 .tmp 目录里面。
commit 阶段:将 flushCache 阶段生产的临时文件移到(rename)对应的列族目录下面,并做一些清理工作,比如删除第一步生成的 snapshot。
参考:https://www.iteblog.com/archives/2497.html




