By大数据研习社
概要:1.案例使用最新稳定版本Flink1.14.3。
2.Flink Table和SQL从Flink1.12版本已经成熟,可以在生产上放心使用。
3.Flink Table和SQL从Flink1.12实现了流批统一的所有特性。
4.Flink Table和SQL与Hive集成需要特别注意版本的兼容性。
1 需求
需求:Flink Table API从Hive数据仓库读取表数据。2 添加Maven依赖
<groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <artifactId>hadoop-hdfs</artifactId> <groupId>org.apache.hadoop</groupId>3 准备Hive数据源
Mary,./home,2022-02-02 12:00:00Bob,./cart,2022-02-02 12:00:00Mary,./prod?id=1,2022-02-02 12:00:05Liz,./home,2022-02-02 12:01:00Bob,./prod?id=3,2022-02-02 12:01:30Mary,./prod?id=7,2022-02-02 12:01:45bin/hdfs dfs -put click.txt data/clicklog/inputcreate database if not exists test location "/user/hive/warehouse/test";create table if not exists clicklogrow format delimited fields terminated by ","load data inpath '/data/clicklog/input/click.txt' into table clicklog;4 代码实现
Flink Table API读取Hive的完整代码如下所示。package com.bigdata.chap02;import org.apache.flink.table.api.*;import org.apache.flink.table.catalog.hive.HiveCatalog;import org.apache.flink.table.module.hive.HiveModule;public class FlinkTableAPIFromHive { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings TableEnvironment tEnv = TableEnvironment.create(settings); String name = "myCatalog"; String defaultDatabase = "test";//配置文件hive-site.xml存放在项目中的data/etc/目录 String hiveConfDir = "data/etc/"; //加载Hive Module(可以使用hive的UDF) tEnv.loadModule(name, new HiveModule("2.3.6")); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tEnv.registerCatalog(name, hive); //4、设置当前sesion使用的catalog和database tEnv.useDatabase(defaultDatabase); tEnv.executeSql("select * from clicklog")5 导入Hive配置文件
Flink Table API创建HiveCatalog通过如下代码:HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);备注:hiveConfDir参数表示hive-site.xml配置文件路径(data/etc/),Flink Table API通过读取该配置实例化HiveCatalog。6 测试运行
在idea工具中,右键项目选择Run运行Flink Table,如果能在控制台看到打印如下结果,说明Flink Table API能成功读取Hive数据仓库中的数据。+----+--------------------------------+--------------------------------+--------------------------------+| op | username | url | ctime |+----+--------------------------------+--------------------------------+--------------------------------+| +I | Mary | ./home | 2022-02-02 12:00:00 || +I | Bob | ./cart | 2022-02-02 12:00:00 || +I | Mary | ./prod?id=1 | 2022-02-02 12:00:05 || +I | Liz | ./home | 2022-02-02 12:01:00 || +I | Bob | ./prod?id=3 | 2022-02-02 12:01:30 || +I | Mary | ./prod?id=7 | 2022-02-02 12:01:45 |+----+--------------------------------+--------------------------------+--------------------------------+长按识别左侧二维码
关注领福利
领10本经典大数据书