暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink-Sink-HBase

大数据从业者 2019-01-17
2613

前言

说来惭愧,原计划日更的,但至今已停更本月以上。只能说太忙了(又给自己找借口),最主要是越来越想写一些高质量的文章,包括新颖性、丰富性、完整性等方面。这半月内几次想更新写点东西,都没下笔,总感觉内容不完整。今天尝试写写高质量的文章。

项目需要Flink-Sink-Hbase功能(即:写Hbase),但目前Flink官方只支持Flink-Source-Hbase(即:读Hbase)。没办法,撸起袖子自己干,自定义实现Flink-Sink-Hbase,要求性能要很好。

自定义Flink-Sink实现

这个还是很简单的,写个SinkHbase类直接继承RichSinkFunction类并实现CheckpointedFunction。继承RichSinkFunction类用于实现写Hbase功能,实现CheckpointedFunction用于在写Hbase功能中引入Flink checkpoint机制,实现需要的语义(如exactly-once)。

简单演示如下:

public class SinkHbase<T> extends RichSinkFunction<T>  implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(SinkHbase.class);

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

}

@Override

public void close() throws Exception {

super.close();

}

@Override

public void invoke(T value, Context context) throws Exception {

}

@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {

}

@Override

public void initializeState(FunctionInitializationContext context) throws Exception {

}

}

写Hbase实现

方法1:基于HbaseClientAPI直接put或者batch。这种方式同步执行,没有buffer,每次调用都触发一次rpc请求,效率很低。这种互联网一大堆,不再赘述。

方法2:基于HbaseClientAPI使用BufferedMutator接口(BufferedMutatorImpl实现类),实现buffer机制,定时定量(Hbase1.x只实现了定量,即buffer size达到阈值;Hbase2.x在定量基础上,也实现了定时,如果数据未达到buffer size,也不能一直不触发rpc请求,这就产生了定时触发)触发异步rpc请求。该方法效率比较高,实时性也比较好。这种方式应用很广泛,比如:知乎AI画像、阿里DataX、spark-hbase(hbasebulkput)、storm-hbase、MapReduce(saveAsNewAPIHadoopDataset)等。源码里面有个例子(源码有点长,这里就不贴出了,感兴趣的可以git clone源码查看):

\hbaseexamples\src\main\java\org\apache\hadoop\hbase\client\example\BufferedMutatorExample.java

方法3:bulkload方式。该方法先写HFile文件到HDFS,再利用LoadIncrementalHFiles类的doBulkLoad方法将HFile文件导入Hbase RegionServer。bulkload方式效率很高,适用于大量数据导入Hbase。应用也很广泛,如:spark-hbase(hbasebulkload)。通常的做法是利用mapreduce job生成HFile文件,我们这里要基于Flink,没必要再跟maprpredue糅合了。我们采用更底层一些的做法,直接调用StoreFile.WriterBuilder构造writer写hfile文件。源码里面有个例子(源码有点长,这里就不贴出了,感兴趣的可以git clone源码查看)

\hbase-server\src\test\java\org\apache\hadoop\hbase\regionserver\CreateRandomStoreFile.java

Flink-Sink-Hbase实现

参考写Hbase中方法2与方法3中的例子,本人已经实现基于这两种方法的Flink-Sink-Hbase。感兴趣的可以自行实践,其实也不是很难。碍于公司网络限制,目前不方便提供实现的源码。后期有时间,脱敏处理后,慢慢同步到本人开源项目。

项目github地址:https://github.com/felixzh2015/felixzhFlinkExamples

希望有机会的话,可以提交pr到Flink官方源码,反馈社区。类似pr曾经在社区出现过,大概是16年好像,后来提交pr的人说没空搞了,有兴趣的可以追踪一下,https://github.com/apache/flink/pull/2332


总结

通过这次项目,对Flink和Hbase都有了更深的研究。本文虽说篇幅很短,自认为结构与完整性还不错,希望对有相关需求的朋友有所帮助,有问题欢迎留言!

                欢迎关注微信公众号:大数据从业者



文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论