
前言
说来惭愧,原计划日更的,但至今已停更本月以上。只能说太忙了(又给自己找借口),最主要是越来越想写一些高质量的文章,包括新颖性、丰富性、完整性等方面。这半月内几次想更新写点东西,都没下笔,总感觉内容不完整。今天尝试写写高质量的文章。
项目需要Flink-Sink-Hbase功能(即:写Hbase),但目前Flink官方只支持Flink-Source-Hbase(即:读Hbase)。没办法,撸起袖子自己干,自定义实现Flink-Sink-Hbase,要求性能要很好。
自定义Flink-Sink实现
这个还是很简单的,写个SinkHbase类直接继承RichSinkFunction类并实现CheckpointedFunction。继承RichSinkFunction类用于实现写Hbase功能,实现CheckpointedFunction用于在写Hbase功能中引入Flink checkpoint机制,实现需要的语义(如exactly-once)。
简单演示如下:
public class SinkHbase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SinkHbase.class);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void invoke(T value, Context context) throws Exception {
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
写Hbase实现
方法1:基于HbaseClientAPI直接put或者batch。这种方式同步执行,没有buffer,每次调用都触发一次rpc请求,效率很低。这种互联网一大堆,不再赘述。
方法2:基于HbaseClientAPI使用BufferedMutator接口(BufferedMutatorImpl实现类),实现buffer机制,定时定量(Hbase1.x只实现了定量,即buffer size达到阈值;Hbase2.x在定量基础上,也实现了定时,如果数据未达到buffer size,也不能一直不触发rpc请求,这就产生了定时触发)触发异步rpc请求。该方法效率比较高,实时性也比较好。这种方式应用很广泛,比如:知乎AI画像、阿里DataX、spark-hbase(hbasebulkput)、storm-hbase、MapReduce(saveAsNewAPIHadoopDataset)等。源码里面有个例子(源码有点长,这里就不贴出了,感兴趣的可以git clone源码查看):
\hbaseexamples\src\main\java\org\apache\hadoop\hbase\client\example\BufferedMutatorExample.java
方法3:bulkload方式。该方法先写HFile文件到HDFS,再利用LoadIncrementalHFiles类的doBulkLoad方法将HFile文件导入Hbase RegionServer。bulkload方式效率很高,适用于大量数据导入Hbase。应用也很广泛,如:spark-hbase(hbasebulkload)。通常的做法是利用mapreduce job生成HFile文件,我们这里要基于Flink,没必要再跟maprpredue糅合了。我们采用更底层一些的做法,直接调用StoreFile.WriterBuilder构造writer写hfile文件。源码里面有个例子(源码有点长,这里就不贴出了,感兴趣的可以git clone源码查看):
\hbase-server\src\test\java\org\apache\hadoop\hbase\regionserver\CreateRandomStoreFile.java
Flink-Sink-Hbase实现
参考写Hbase中方法2与方法3中的例子,本人已经实现基于这两种方法的Flink-Sink-Hbase。感兴趣的可以自行实践,其实也不是很难。碍于公司网络限制,目前不方便提供实现的源码。后期有时间,脱敏处理后,慢慢同步到本人开源项目。
项目github地址:https://github.com/felixzh2015/felixzhFlinkExamples
希望有机会的话,可以提交pr到Flink官方源码,反馈社区。类似pr曾经在社区出现过,大概是16年好像,后来提交pr的人说没空搞了,有兴趣的可以追踪一下,https://github.com/apache/flink/pull/2332
总结
通过这次项目,对Flink和Hbase都有了更深的研究。本文虽说篇幅很短,自认为结构与完整性还不错,希望对有相关需求的朋友有所帮助,有问题欢迎留言!
欢迎关注微信公众号:大数据从业者





