背景
最近支持南方某省医保项目,业务ISV厂商有个业务场景需要基于FlinkSQL构建实时数仓,数据通道为:Kafka->Flink->HBase。
本文梳理总结使用FlinkSQL消费Kafka数据实时写入HBase过程中遇到的问题、解决思路、解决方法等。
温馨提示:微信公众号私信有时效性、过期不可回复,很多朋友私信我没及时看到、未能回复。最近新建钉钉群(44703541),欢迎朋友加入!
环境说明
Kafka:2.3.0Flink:1.12.2Hadoop:3.0.0-cdh6.2.0HBase:2.1.0-cdh6.2.0
Flink SQL
注意:测试用例并非现场表结构
创建Kafka表
CREATE TABLE t_kafka (rowkey STRING,name STRING,age INT) WITH ('connector' = 'kafka','topic' = 'test','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'felixzh:9092','properties.group.id' = 'group','value.format' = 'json','value.json.fail-on-missing-field' = 'true','value.fields-include' = 'ALL');
创建HBase表
CREATE TABLE t_hbase (rowkey STRING,family1 ROW<name STRING>,PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-2.2','table-name' = 'TestNameSpace.TestTable','zookeeper.znode.parent' = '/hbase','zookeeper.quorum' = 'felixzh:2181','sink.parallelism' = '3');
提交FlinkSQL任务
INSERT INTO t_hbase SELECT rowkey, ROW(name) FROM t_kafka;
问题1:select超时不可用
场景描述:出于安全性考虑、受限于网络隔离环境,业务ISV并不能直接登录到集群节点。所以,需要在ISV主机安装配置Flink(称为Client节点)。
问题描述:Client节点启动sql-client.sh,DDL语句正常、DML语句insert…into…select正常、select…from…语句失败,提示
Caused by: java.net.ConnectException: Connection timed out (Connection timed out)
其实,很容易知道是网络有问题。不过,Flink1.12.2版本这部分的异常日志太笼统,社区有相关issue进行优化,最起码打印出来具体的IP和port。


分析:集群节点ping命令测试与Client节点之间的网络,发现网络不可达。这也解释Client节点select语句连接超时问题:select语句提交到集群节点执行没问题,但是返回结果到Client节点时因网络不可达报错;而insert into select语句执行同样在集群节点,并不输出结果到Client节点而是直接输出到具体的insert into的sink端(即集群HBase服务),所以不会受网络不可达影响。
至此,该问题转给现场同学排查网络问题。
问题2:FileNotFoundException
问题描述:Client节点和集群节点提交insert into select语句,TaskManager日志报错如下:
Failed to check remote dir status /tmp/hbase-username/hbase/libFileNotFoundException: File tmp/hbase-username/hbase/lib does not exist
分析:sql-client进程加载的hbase-sitex.xml并没有该路径,继续撸源码发现flink-sql-connector模块存在hbase-default.xml文件,如下:
flink-release-1.12.2\flink-connectors\flink-sql-connector-hbase-2.2\src\main\resources\hbase-default.xml
相关参数hbase.dynamic.jars.dir即为/tmp/hbase-username/hbase/lib,详见下图:


解决方法:将hbase.dynamic.jars.dir改为集群节点已存在的路径。
问题3:NoClassDefFoundError
问题描述:Client节点和集群节点提交insert into select语句运行18个小时左右,TaskManager日志报错如下:
Java.lang.NoClassDefFoundError: org/apache/Hadoop/hbase/ServerName
分析:flink-sql-connector-hbase模块是include相关HBase依赖如图:

然后,查找flink-sql-connector-hbase-2.2_2.12-1.12.2.jar是否存在ServerName类,如图:

虽然存在ServerName类,但是类路径被relocation了,如图:

解决方法:由于ServerName类存在于hbase-common模块中。所以,拷贝集群HBase节点hbase-common-2.1.0-cdh6.2.0.jar到flink/lib路径即可。
问题4:RegionTooBusyException
问题描述:如果说上述三个问题属于功能问题,那么这个问题无疑属于性能问题。使用FlinkSQL消费Kafka数据写入HBase预分区表,大数据量时TaskManager日志报错如下:
Execption=org.apache.hadoop.hbase.RegionTooBusyExceptionAbove parallelPutToStoreThreadLimit(10)
分析:通过撸HBase源码发现,RegionTooBusyException是StoreHotnessProtector类抛出的。StoreHotnessProtector类很有意思,是HBase用于限制热点数据的,如图:

所以说,该类触发异常,那肯定是存在热点数据,rowkey设计可能不合理。当然,这三个参数默认值也不是很合理。
解决方法:反馈到客户,建议优化下rowkey的设计规则。同时,酌情调大三个参数的默认值。为避免贸然调整参数,引起不好的影响且不想重启HBase集群,仅对FlinkSQL写入的HBase表进行了表级别的参数调整,操作如下:
1. disable 'TestNameSpace.TestTable'2. alter 'TestNameSpace.TestTable', CONFIGURATION => {'hbase.region.store.parallel.put.limit.min.column.count ' => 200, 'hbase.region.store.parallel.put.limit' => 100}3. enable 'TestNameSpace.TestTable'
结论
至此,支持该医保项目暂时告一段落。
温馨提示:微信公众号私信有时效性、过期不可回复,很多朋友私信我没及时看到、未能回复。最近新建钉钉群(44703541),欢迎朋友加入!




