暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 关联 Hbase 非主键

原创 kerosene 2022-06-14
1322

Hbase 作为 Hadoop 全家桶中,非常重要的存储组件,适用于海量数据的随机查询,使用是非常广泛的。

实时数仓项目使用 Kafka 作为数仓的基础表,我们也会把 Kafka 的数据往 Hbase 写一份,方便其他场景使用,比如:做维表

Flink Hbase 表默认使用 TableScan 一次性加载全量维表数据来关联,维表数据不能更新,适用场景比较少(由于 TableScan 完成后,算子状态为 Finish,导致任务不能完成 Checkpoint)

Flink Hbase 时态表 Lookup 功能(仅限于关联 Hbase 表主键,不支持非主键),支持缓存和透查外部系统,完美解决默认表维表数据不能更新和不能 完成 Checkpoint 的问题

关联 sql 如下

复制代码

CREATE TEMPORARY TABLE hbase_behavior_conf (
   rowkey STRING
  ,cf ROW(item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3))
) WITH (
   'connector' = 'hbase-2.2'
   ,'zookeeper.quorum' = 'thinkpad:12181'
   ,'table-name' = 'user_log'
   ,'lookup.cache.max-rows' = '10000'
   ,'lookup.cache.ttl' = '1 minute' -- ttl time 超过这么长时间无数据才行
   ,'lookup.async' = 'true'
);

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.cf.item_id, a.ts
FROM user_log a
    left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c  ON a.user_id = rowkey -- 主键
    -- left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c ON a.user_id = cf.item_id  --非主键
where a.behavior is not null;

复制代码

关联非主键报错如下:

复制代码

Caused by: java.lang.IllegalArgumentException: Currently, HBase table can only be lookup by single rowkey.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
    at org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.getLookupRuntimeProvider(HBaseDynamicTableSource.java:52)
    at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.getLookupFunction(LookupJoinUtil.java:172)
    at org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin.explainTerms(CommonPhysicalLookupJoin.scala:168)
    at org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
    at org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391)
    at org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
    at java.util.HashMap.hash(HashMap.java:339)
    at java.util.HashMap.get(HashMap.java:557)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
    at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
    at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
    ... 45 more

复制代码


Flink 关联 Hbase 场景也有关联非主键的场景,刚开始用的时候,为了方便就直接实现一个 UDF,启动的时候加载全量的 Hbase 表数据到内存中(维表数据并不多),根据策略定期去Hbase 重新加载最新的数据。

受 Lookup Source 的启发,想实现一个关联 Hbase 非主键的 UDF,支持缓存和缓存失效透查 Hbase

需求如下:
1、UDF 关联 Hbase 表非主键
2、支持缓存时间和缓存数量的控制
3、关联键缓存有值就从缓存出
4、关联键缓存没有值就从 Hbase 查,结果放到缓存中

基于这些需求,实现了这样的一个 UDTF(由于 Hbase 表非主键可能有重复值,所以使用 Table Function,如果有多条数据,返回多条数据到 SQL 端处理)

## UDF 代码

复制代码

@FunctionHint(output = new DataTypeHint("ROW<arr ARRAY<STRING>>"))
  def eval(key: String): Unit = {
    // if key is empty
    if (key == null || key.length == 0) {
      return
    }
    // insert
    val rowKind = RowKind.fromByteValue(0.toByte)
    val row = new Row(rowKind, 1)
    // get result from cache
    var list: ListBuffer[Array[String]] = cache.getIfPresent(key)
    if (list != null) {
      list.foreach(arr => {
        row.setField(0, arr)
        collect(row)
      })
      return
    }
    // cache get nothing, query hbase
    list = queryHbase(key)
    if (list.length == 0) {
      // if get nothing
      return
    }
    // get result, add to cache
    cache.put(key, list)
    list.foreach(arr => {
      row.setField(0, arr)
      collect(row)
    })
    //    LOG.info("finish join key : " + key)
  }

  /**
   * query hbase
   *
   * @param key join key
   * @return query result row
   */
  private def queryHbase(key: String): ListBuffer[Array[String]] = {
    val scan: Scan = new Scan();
    qualifier.foreach(item => scan.addColumn(family, item))

    val filter = new SingleColumnValueFilter(family, qualifier.head, CompareOperator.EQUAL, key.getBytes("UTF8"))
    scan.setFilter(filter)

    val resultScanner = table.getScanner(scan)
    val it = resultScanner.iterator()

    val list = new ListBuffer[Array[String]]
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论