暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink1.14.3 Table读取Hive数据仓库

大数据研习社 2022-04-25
984


长按二维码关注

大数据领域必关注的公众号

By大数据研习社

概要:1.案例使用最新稳定版本Flink1.14.3。

2.Flink Table和SQL从Flink1.12版本已经成熟,可以在生产上放心使用。

3.Flink Table和SQL从Flink1.12实现了流批统一的所有特性。

4.Flink Table和SQL与Hive集成需要特别注意版本的兼容性。


需求

需求:Flink Table APIHive数据仓库读取表数据。

添加Maven依赖

FlinkTable集成Hive引⼊如下依赖:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>${hive.version}</version>
  <exclusions>
<exclusion>
  <artifactId>hadoop-hdfs</artifactId>
  <groupId>org.apache.hadoop</groupId>
</exclusion>
  </exclusions>
</dependency>

准备Hive数据源

#创建本地文件click.txt
vi click.txt
Mary,./home,2022-02-02 12:00:00
Bob,./cart,2022-02-02 12:00:00
Mary,./prod?id=1,2022-02-02 12:00:05
Liz,./home,2022-02-02 12:01:00
Bob,./prod?id=3,2022-02-02 12:01:30
Mary,./prod?id=7,2022-02-02 12:01:45
 #click.txt上传至hdfs
bin/hdfs dfs -put click.txt data/clicklog/input
 
#创建Hive数据库
create database if not exists test location "/user/hive/warehouse/test";
use test;
 
#创建Hive
drop table clicklog;
create  table if not exists  clicklog
(userName string,
url string,
cTime string)
row format delimited fields terminated by ","
stored as textfile;
 
#加载hdfs中的click.txtHive
load data inpath '/data/clicklog/input/click.txt' into table clicklog;
 
#查询Hive
select * from clicklog;

代码实现

Flink Table API读取Hive的完整代码如下所示。
package com.bigdata.chap02;
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
public class FlinkTableAPIFromHive {
    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
 
        //2、创建HiveCatalog
//hive连接实例
        String name = "myCatalog";
//hive中的数据库名称
        String defaultDatabase = "test";
//配置文件hive-site.xml存放在项目中的data/etc/目录
        String hiveConfDir     = "data/etc/";
        //加载Hive Module(可以使用hiveUDF)
        tEnv.loadModule(name, new HiveModule("2.3.6"));
        //使用hive方言(hivesql特有的语法)
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
 
        //3、注册myCatalog
        tEnv.registerCatalog(name, hive);
 
        //4、设置当前sesion使用的catalogdatabase
        tEnv.useCatalog(name);
        tEnv.useDatabase(defaultDatabase);
 
        //5、查询hive中的表
        tEnv.executeSql("select * from clicklog")
                .print();
    }
}

导入Hive配置文件

Flink Table API创建HiveCatalog通过如下代码:
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
 
备注:hiveConfDir参数表示hive-site.xml配置文件路径(data/etc/),Flink Table API通过读取该配置实例化HiveCatalog
 

测试运行

idea工具中,右键项目选择Run运行Flink Table,如果能在控制台看到打印如下结果,说明Flink Table API能成功读取Hive数据仓库中的数据。
+----+--------------------------------+--------------------------------+--------------------------------+
| op |                       username |                            url |                          ctime |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                           Mary |                         ./home |            2022-02-02 12:00:00 |
| +I |                            Bob |                         ./cart |            2022-02-02 12:00:00 |
| +I |                           Mary |                    ./prod?id=1 |            2022-02-02 12:00:05 |
| +I |                            Liz |                         ./home |            2022-02-02 12:01:00 |
| +I |                            Bob |                    ./prod?id=3 |            2022-02-02 12:01:30 |
| +I |                           Mary |                    ./prod?id=7 |            2022-02-02 12:01:45 |
+----+--------------------------------+--------------------------------+--------------------------------+
欢迎点赞 + 收藏 + 在看  素质三连 


往期精彩回顾
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码

长按识别左侧二维码

     关注领福利    

  领10本经典大数据书

文章转载自大数据研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论