目标:实现分钟级时效的实时入湖
运行环境:
Spark: 2.4.5
Flink: 1.12.5
Hive: 1.2.1
Hudi: 0.9.0
一、Linux 准备
创建 flink 账户,并切换到该账户。添加环境变量:
export HADOOP_CLASSPATH=`hadoop classpath`
二、编译 hudi 0.9.0版本
下载 hudi 源码和打补丁
-- 克隆 hudi 源码git clone -b release-0.9.0 --depth 1 https://github.com/apache/hudi.git hudi-0.9-cherry-pick-- 移动到项目目录cd hudi-0.9-cherry-pick-- 打上补丁git fetch origin pull/3519/head:cherry-pick-- 解决关闭 metastore client 报错,需要同时打上这个 PR:https://github.com/apache/hudi/issues/3848
编译 hudi
1. Windows编译的注释掉集成测试模块
<module>hudi-integ-test</module><module>packaging/hudi-integ-test-bundle</module>
2. 修改 packaging 文件夹下 hudi-flink-bundle 模块的 pom 文件,将 hive 版本修改为自己集群对应的版本。
如果是 hive1 版本就修改 flink-bundle-shade-hive1 profile 下的,hive2 版本则修改 flink-bundle-shade-hive2 profile下的,hive3 类似
由于我的集群 hive 是 hive1.2.1 的版本,所以修改如下:
<profile><id>flink-bundle-shade-hive1</id><properties><hive.version>1.2.1</hive.version><thrift.version>0.9.2</thrift.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope></properties></profile>
3. hive1版本需要将集群的 hive-site.xml 拷贝至 hudi-flink 模块的 resources 文件夹下面。
原因是hive1版本的 HiveConf 类加载 hive-site.xml 是会从项目的根路径下寻找。下面是相关源码:
static {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();if (classLoader == null) {classLoader = HiveConf.class.getClassLoader();}hiveDefaultURL = classLoader.getResource("hive-default.xml");// Look for hive-site.xml on the CLASSPATH and log its location if found.hiveSiteURL = classLoader.getResource("hive-site.xml");hivemetastoreSiteUrl = classLoader.getResource("hivemetastore-site.xml");hiveServer2SiteUrl = classLoader.getResource("hiveserver2-site.xml");for (ConfVars confVar : ConfVars.values()) {vars.put(confVar.varname, confVar);}}
4. maven 编译:命令如下,其中 flink-bundle-shade-hive1 如果是hive2版本则改为 flink-bundle-shade-hive2
mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive1 -Dspark2
三、flink 和 kafka 环境准备
下载 flink 1.12.5 编译好的包,hudi 0.9.0 版本对应 flink 1.12.5 版本
下载地址:
https://flink.apache.org/downloads.html#update-policy-for-old-releases
找到对应版本:
https://archive.apache.org/dist/flink/flink-1.12.5/
下载命令:
wget https://archive.apache.org/dist/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.11.tgz
解压文件:
tar -zxvf flink-1.12.5-bin-scala_2.11.tgz
修改 standalone 集群配置
1、修改 flink-conf.yaml 文件
-- 修改 slot 数,防止不够启动taskmanager.numberOfTaskSlots: 8-- 修改 checkpoint 相关参数state.backend: rocksdbstate.checkpoints.dir: hdfs:///tmp/flink/checkpoints/test-flink/state.savepoints.dir: hdfs:///tmp/flink/checkpoints/test-flink/state.backend.incremental: truestate.checkpoint-storage: filesystemexecution.checkpointing.interval: 15sexecution.checkpointing.mode: EXACTLY_ONCEclassloader.resolve-order: parent-firstrestart-strategy.failure-rate.max-failures-per-interval: 30restart-strategy.failure-rate.failure-rate-interval: 1 minrestart-strategy.failure-rate.delay: 30 s
2、修改 sql-client-defaults.yaml 文件
-- hivecatalogs:- name: myhivetype: hivehive-conf-dir: /etc/hive/conf(集群具体的路径可能不一样)default-database: flink_catalogexecution:planner: blinktype: streamingtime-characteristic: event-timeperiodic-watermarks-interval: 200result-mode: tablemax-table-result-rows: 1000000max-parallelism: 128min-idle-state-retention: 0max-idle-state-retention: 0current-catalog: myhivecurrent-database: flink_catalog
创建 checkpoint 路径
hdfs dfs -mkdir hdfs:///tmp/flink/checkpoints/test-flink
创建 flink_catalog 库存放 flink 元数据表
create database flink_catalog;
上传 hudi 编译好的 hudi-flink-bundle_2.11-0.9.0.jar 到 lib 目录下
该 jar 包位于 packaging 下 hudi-flink-bundle 模块的 target 目录下
下载 kafka connector 和 hive 依赖的包到 lib 目录下
1. 下载 flink-sql-connector-hive-1.2.2_2.11-1.12.5.jar
下载地址:
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/hive/
下载对应的 flink 版本,下载命令:
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.12.5/flink-sql-connector-hive-1.2.2_2.11-1.12.5.jar
2. 下载 flink-sql-connector-kafka_2.11-1.12.5.jar
下载地址:
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
下载对应的 flink 版本,下载命令:
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.5/flink-sql-connector-kafka_2.11-1.12.5.jar
kafka 创建topic:
flink_on_hudi_cow_test
四、启动 standalone 集群
./bin/start-cluster.sh
五、启动 sql-client
./bin/sql-client.sh embedded
进入 sql 客户端后,输入以下命令:
1. 创建 kafka source 表
CREATE TABLE flink_catalog.kafka_source_test(user_id STRING,order_amount BIGINT,log_ts TIMESTAMP(3),_hoodie_is_deleted boolean,part STRING)WITH('connector' = 'kafka','topic' = 'flink_on_hudi_cow_test','properties.bootstrap.servers' = 'xxx', (填写 kafka 地址)'scan.startup.mode'='latest-offset','properties.group.id' = 'test','format' = 'json');
2. 创建 hudi sink 表
CREATE TABLE flink_catalog.hudi_test(user_id VARCHAR(20),order_amount BIGINT,log_ts TIMESTAMP(3),_hoodie_is_deleted boolean,`part` VARCHAR(20))PARTITIONED BY (`part`)WITH ('connector' = 'hudi','path' = 'hdfs:///apps/hive/warehouse/test.db/hudi_test', (hudi 写文件的路径)'table.type' = 'COPY_ON_WRITE','write.precombine.field' = 'log_ts', (hudi 相同主键合并时根据该列排序取最新的记录)'hoodie.datasource.write.recordkey.field' = 'user_id', (hudi 主键)'write.bucket_assign.tasks' = '2','write.tasks' = '2','hive_sync.enable' = 'true', (开启 hive 同步)'hive_sync.mode' = 'hms', (hive 同步的模式,有 hms 和 jdbc 的模式,具体可以看 hudi 官网配置)'hive_sync.db'='test', (hive 同步的库名)'hive_sync.table'='hudi_test', (hive 同步的表名)'hive_sync.metastore.uris' = 'thrift://xxx'(填写 hive metastore 地址));
3. 消费 kafka 写入 hudi
insert into flink_catalog.hudi_test select * from flink_catalog.kafka_source_test;
六、往 kafka 生产数据
./kafka-console-producer.sh --broker-list xxx --topic flink_on_hudi_cow_test
数据样例:
{"user_id":"a1","order_amount":113.0,"log_ts":"2020-06-30 12:12:12","_hoodie_is_deleted":"false","part":"par1"}{"user_id":"a2","order_amount":113.0,"log_ts":"2020-06-30 12:12:12","_hoodie_is_deleted":"false","part":"par1"}{"user_id":"a2","order_amount":113.0,"log_ts":"2020-06-30 12:12:12","_hoodie_is_deleted":"true","part":"par1"}{"user_id":"a3","order_amount":160.0,"log_ts":"2020-06-30 12:22:16","_hoodie_is_deleted":"false","part":"par2"}{"user_id":"a5","order_amount":160.0,"log_ts":"2020-06-30 12:22:16","_hoodie_is_deleted":"false","part":"par3"}{"user_id":"a8","order_amount":260.0,"log_ts":"2020-06-30 13:22:16","_hoodie_is_deleted":"false","part":"par1"}
其中 _hoodie_is_deleted 字段是 hudi 判断是否删除的标志,具体判断逻辑在 hudi 的 payload 类里面。true 为删除,false 为不是删除操作
上面 a2 有两条记录,第二条的 _hoodie_is_deleted 为 true,所以最终 a2 是不存在的,被删除了。
接下来如果顺利的话,十几秒过后就会把 kafka 的记录写到 hudi 表。
为什么是十几秒呢?原因是 hudi 在 flink 触发 checkpoint 后才落盘写 parquet 文件。而刚才我们 standalone 集群设置的 checkpoint 间隔为15s:
execution.checkpointing.interval: 15s
在 jobmanager 能看到如下日志:
2022-02-14 11:15:18,987 INFO org.apache.hudi.hive.HiveSyncTool [] - Hive table hudi_test is not found. Creating it2022-02-14 11:15:19,672 INFO org.apache.hudi.hive.HiveSyncTool [] - Schema sync complete. Syncing partitions for hudi_test2022-02-14 11:15:19,672 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be null2022-02-14 11:15:19,672 INFO org.apache.hudi.sync.common.AbstractSyncHoodieClient [] - Last commit time synced is not known, listing all partitions in hdfs:///apps/hive/warehouse/test.db/hudi_test,FS :DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1401082153_30, ugi=flink (auth:SIMPLE)]]2022-02-14 11:15:19,685 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 32022-02-14 11:15:19,706 INFO org.apache.hudi.hive.HiveSyncTool [] - New Partitions [par1, par2, par3]2022-02-14 11:15:19,706 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Adding partitions 3 to table hudi_test2022-02-14 11:15:19,782 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 3 (type=CHECKPOINT) @ 1644808519775 for job cdbc1b70c3e6a1d337e80365351a72b8.2022-02-14 11:15:19,783 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [taking checkpoint 3] success!2022-02-14 11:15:19,805 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [handle write metadata event for instant 20220214111517] success!2022-02-14 11:15:19,807 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [handle write metadata event for instant 20220214111517] success!2022-02-14 11:15:19,846 INFO org.apache.hudi.hive.HiveSyncTool [] - Changed Partitions []2022-02-14 11:15:19,846 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - No partitions to change for hudi_test2022-02-14 11:15:19,900 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for hudi_test
从日志可以看出,由于我们开启了 hive 同步,hudi 判断表是否存在,如果不存在会帮我们创建表
七、Spark 查询测试
Spark 查询需要 另一个编译好的 jar 包:
hudi-utilities-bundle_2.11-0.9.0.jar。该包在 packaging 文件夹下的 hudi-utilities-bundle 模块的 target 目录下
执行命令:
spark-shell --master yarn --queue test --num-executors 2 --executor-cores 2 --executor-memory 6g --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.hive.convertMetastoreParquet=false' --jars hudi-utilities-bundle_2.11-0.9.0.jar
scala> spark.sql("select * from test.hudi_test limit 10").show(10,false)
至此,flink on hudi 实时入湖的流程就测完了




