Hbase 作为 Hadoop 全家桶中,非常重要的存储组件,适用于海量数据的随机查询,使用是非常广泛的。
实时数仓项目使用 Kafka 作为数仓的基础表,我们也会把 Kafka 的数据往 Hbase 写一份,方便其他场景使用,比如:做维表
Flink Hbase 表默认使用 TableScan 一次性加载全量维表数据来关联,维表数据不能更新,适用场景比较少(由于 TableScan 完成后,算子状态为 Finish,导致任务不能完成 Checkpoint)
Flink Hbase 时态表 Lookup 功能(仅限于关联 Hbase 表主键,不支持非主键),支持缓存和透查外部系统,完美解决默认表维表数据不能更新和不能 完成 Checkpoint 的问题
关联 sql 如下
CREATE TEMPORARY TABLE hbase_behavior_conf (
rowkey STRING
,cf ROW(item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3))
) WITH (
'connector' = 'hbase-2.2'
,'zookeeper.quorum' = 'thinkpad:12181'
,'table-name' = 'user_log'
,'lookup.cache.max-rows' = '10000'
,'lookup.cache.ttl' = '1 minute' -- ttl time 超过这么长时间无数据才行
,'lookup.async' = 'true'
);
INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.cf.item_id, a.ts
FROM user_log a
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c ON a.user_id = rowkey -- 主键
-- left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c ON a.user_id = cf.item_id --非主键
where a.behavior is not null;关联非主键报错如下:
Caused by: java.lang.IllegalArgumentException: Currently, HBase table can only be lookup by single rowkey.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.getLookupRuntimeProvider(HBaseDynamicTableSource.java:52)
at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.getLookupFunction(LookupJoinUtil.java:172)
at org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin.explainTerms(CommonPhysicalLookupJoin.scala:168)
at org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
at org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391)
at org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
at java.util.HashMap.hash(HashMap.java:339)
at java.util.HashMap.get(HashMap.java:557)
at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
... 45 more
Flink 关联 Hbase 场景也有关联非主键的场景,刚开始用的时候,为了方便就直接实现一个 UDF,启动的时候加载全量的 Hbase 表数据到内存中(维表数据并不多),根据策略定期去Hbase 重新加载最新的数据。
受 Lookup Source 的启发,想实现一个关联 Hbase 非主键的 UDF,支持缓存和缓存失效透查 Hbase
需求如下:
1、UDF 关联 Hbase 表非主键
2、支持缓存时间和缓存数量的控制
3、关联键缓存有值就从缓存出
4、关联键缓存没有值就从 Hbase 查,结果放到缓存中
基于这些需求,实现了这样的一个 UDTF(由于 Hbase 表非主键可能有重复值,所以使用 Table Function,如果有多条数据,返回多条数据到 SQL 端处理)
## UDF 代码
@FunctionHint(output = new DataTypeHint("ROW<arr ARRAY<STRING>>"))
def eval(key: String): Unit = {
// if key is empty
if (key == null || key.length == 0) {
return
}
// insert
val rowKind = RowKind.fromByteValue(0.toByte)
val row = new Row(rowKind, 1)
// get result from cache
var list: ListBuffer[Array[String]] = cache.getIfPresent(key)
if (list != null) {
list.foreach(arr => {
row.setField(0, arr)
collect(row)
})
return
}
// cache get nothing, query hbase
list = queryHbase(key)
if (list.length == 0) {
// if get nothing
return
}
// get result, add to cache
cache.put(key, list)
list.foreach(arr => {
row.setField(0, arr)
collect(row)
})
// LOG.info("finish join key : " + key)
}
/**
* query hbase
*
* @param key join key
* @return query result row
*/
private def queryHbase(key: String): ListBuffer[Array[String]] = {
val scan: Scan = new Scan();
qualifier.foreach(item => scan.addColumn(family, item))
val filter = new SingleColumnValueFilter(family, qualifier.head, CompareOperator.EQUAL, key.getBytes("UTF8"))
scan.setFilter(filter)
val resultScanner = table.getScanner(scan)
val it = resultScanner.iterator()
val list = new ListBuffer[Array[String]]「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





