暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

南方某省医保实时数仓项目:Flink SQL消费Kafka数据到HBase填坑指南

大数据从业者 2022-06-24
3030

背景

     最近支持南方某省医保项目,业务ISV厂商有个业务场景需要基于FlinkSQL构建实时数仓,数据通道为:Kafka->Flink->HBase。

      本文梳理总结使用FlinkSQL消费Kafka数据实时写入HBase过程中遇到的问题、解决思路、解决方法等。

       温馨提示:微信公众号私信有时效性、过期不可回复,很多朋友私信我没及时看到、未能回复。最近新建钉钉群(44703541),欢迎朋友加入!

环境说明

    Kafka:2.3.0
    Flink:1.12.2
    Hadoop:3.0.0-cdh6.2.0
    HBase:2.1.0-cdh6.2.0

    Flink SQL

    注意:测试用例并非现场表结构

    创建Kafka表

      CREATE TABLE t_kafka (
      rowkey STRING
      ,name STRING
      ,age INT
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'test', 
      'scan.startup.mode' = 'earliest-offset', 
      'properties.bootstrap.servers' = 'felixzh:9092',
      'properties.group.id' = 'group',
      'value.format' = 'json',
      'value.json.fail-on-missing-field' = 'true',
      'value.fields-include' = 'ALL');

      创建HBase表

        CREATE TABLE t_hbase (
        rowkey STRING
        ,family1 ROW<name STRING>
        ,PRIMARY KEY (rowkey) NOT ENFORCED
        ) WITH (
        'connector' = 'hbase-2.2',
        'table-name' = 'TestNameSpace.TestTable', 
        'zookeeper.znode.parent' = '/hbase', 
        'zookeeper.quorum' = 'felixzh:2181',
        'sink.parallelism' = '3');

        提交FlinkSQL任务

          INSERT INTO t_hbase SELECT rowkey, ROW(name) FROM t_kafka;

          问题1:select超时不可用

                场景描述:出于安全性考虑、受限于网络隔离环境,业务ISV并不能直接登录到集群节点。所以,需要在ISV主机安装配置Flink(称为Client节点)。

                问题描述:Client节点启动sql-client.sh,DDL语句正常、DML语句insert…into…select正常、select…from…语句失败,提示

            Caused by: java.net.ConnectException: Connection timed out (Connection timed out)

                   其实,很容易知道是网络有问题。不过,Flink1.12.2版本这部分的异常日志太笼统,社区有相关issue进行优化,最起码打印出来具体的IP和port。


                  分析:集群节点ping命令测试与Client节点之间的网络,发现网络不可达。这也解释Client节点select语句连接超时问题:select语句提交到集群节点执行没问题,但是返回结果到Client节点时因网络不可达报错;而insert into select语句执行同样在集群节点,并不输出结果到Client节点而是直接输出到具体的insert into的sink端(即集群HBase服务),所以不会受网络不可达影响。

                   至此,该问题转给现场同学排查网络问题。

            问题2:FileNotFoundException

                  问题描述:Client节点和集群节点提交insert into select语句,TaskManager日志报错如下:

              Failed to check remote dir status /tmp/hbase-username/hbase/lib
              FileNotFoundException: File tmp/hbase-username/hbase/lib does not exist

                     分析:sql-client进程加载的hbase-sitex.xml并没有该路径,继续撸源码发现flink-sql-connector模块存在hbase-default.xml文件,如下:

                flink-release-1.12.2\flink-connectors\flink-sql-connector-hbase-2.2\src\main\resources\hbase-default.xml

                   相关参数hbase.dynamic.jars.dir即为/tmp/hbase-username/hbase/lib,详见下图:


                        解决方法:将hbase.dynamic.jars.dir改为集群节点已存在的路径。

                问题3:NoClassDefFoundError

                       问题描述:Client节点和集群节点提交insert into select语句运行18个小时左右,TaskManager日志报错如下:

                  Java.lang.NoClassDefFoundError: org/apache/Hadoop/hbase/ServerName

                         分析:flink-sql-connector-hbase模块是include相关HBase依赖如图:

                       然后,查找flink-sql-connector-hbase-2.2_2.12-1.12.2.jar是否存在ServerName类,如图:

                         虽然存在ServerName类,但是类路径被relocation了,如图:

                        解决方法:由于ServerName类存在于hbase-common模块中。所以,拷贝集群HBase节点hbase-common-2.1.0-cdh6.2.0.jar到flink/lib路径即可。

                  问题4:RegionTooBusyException

                         问题描述:如果说上述三个问题属于功能问题,那么这个问题无疑属于性能问题。使用FlinkSQL消费Kafka数据写入HBase预分区表,大数据量时TaskManager日志报错如下:

                    Execption=org.apache.hadoop.hbase.RegionTooBusyException
                    Above parallelPutToStoreThreadLimit(10)

                           分析:通过撸HBase源码发现,RegionTooBusyException是StoreHotnessProtector类抛出的。StoreHotnessProtector类很有意思,是HBase用于限制热点数据的,如图:

                    所以说,该类触发异常,那肯定是存在热点数据,rowkey设计可能不合理。当然,这三个参数默认值也不是很合理。

                          解决方法:反馈到客户,建议优化下rowkey的设计规则。同时,酌情调大三个参数的默认值。为避免贸然调整参数,引起不好的影响且不想重启HBase集群,仅对FlinkSQL写入的HBase表进行了表级别的参数调整,操作如下:

                      • 1.    disable 'TestNameSpace.TestTable'
                        2.    alter  'TestNameSpace.TestTable', CONFIGURATION => {'hbase.region.store.parallel.put.limit.min.column.count ' =>  200, 'hbase.region.store.parallel.put.limit' =>  100}
                        3. enable 'TestNameSpace.TestTable'


                      结论

                             至此,支持该医保项目暂时告一段落。

                            温馨提示:微信公众号私信有时效性、过期不可回复,很多朋友私信我没及时看到、未能回复。最近新建钉钉群(44703541),欢迎朋友加入!


                      文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论