By大数据研习社
概要:与老版本相比,Flink1.13版本与Hive集成配置有一些变化,从Flink1.13开始移除了sql-client-defaults.yml配置⽂件,catalog相关配置可以放在新增的sql-init.sql文件来配置。
Flink SQL与Hive集成需要添加相关依赖包如下所示:flink-shaded-hadoop-2-uber-2.8.3-10.0.jarflink-sql-connector-hive-2.3.6_2.11-1.13.6.jar#其他依赖包(在hadoop安装目录的shere/hadoop/common目录下)commons-configuration-1.6.jarcommons-logging-1.1.3.jarhadoop-mapreduce-client-common-2.9.2.jarhadoop-mapreduce-client-core-2.9.2.jarhadoop-mapreduce-client-hs-2.9.2.jarhadoop-mapreduce-client-jobclient-2.9.2.jarhtrace-core4-4.1.0-incubating.jarjavax.servlet-api-3.1.0.jar在Flink安装目录的conf目录下,修改flink-conf.yaml配置文件,添加如下内容:classloader.check-leaked-classloader: false准备初始化脚本init.sql,并将init.sql文件放入Flink的conf目录下。 'default-database'='default', 'hive-conf-dir'='/home/hadoop/app/hive/conf',SET 'execution.runtime-mode' = 'batch';SET 'sql-client.execution.result-mode' = 'table';SET 'table.sql-dialect'='hive';初始化脚本中我们创建了hive catalog,相关配置项如下:
由于Flink SQL是通过metastore服务来访问Hive的MySQL数据库管理元数据,所以首先需要启动Hive的metastore服务:bin/hive --service metastore启动SQL Client时通过-i选项指定初始化脚本即可:bin/sql-client.sh -i conf/sql-init.sql注意:Flink1.13开始移除了sql-client-defaults.yml配置⽂件,所以在该配置⽂件配置catalog的⽅法就不存在了,目前相关配置添加到sql-init.sql文件即可。Flink SQL>use catalog MyCatalog;Flink SQL>show databases;Flink SQL>select * from MyCatalog.test.stu;备注:MyCatalog为catalog,test为Hive中的数据库,stu为test中的表。Flink目前支持两种SQL方言(SQL dialects),分别为:default和hive。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive。Flink SQL>set table.sql-dialect=hive;Flink SQL>CREATE TABLE hive_dialect_clicklog(`user` STRING,`url` STRING,`cTime` STRING);Flink SQL>insert into hive_dialect_clicklog select 'Mary','./prod?id=7','2022-02-02 12:01:45';在Hive客户端可以查询Flink SQL中创建的表及插入的数据。hive>select * from hive_dialect_clicklog;备注:hive_dialect_clicklog为Flink SQL创建的表,该表的元数据信息存储在MySQL数据库,所以即使Flink SQL重启后,一样可以在Hive中查询hive_dialect_clicklog表中的数据。长按识别左侧二维码
关注领福利
领10本经典大数据书