暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink on Hudi 实时入湖保姆级教程

半夏大师兄 2022-02-15
1212

目标:实现分钟级时效的实时入湖


运行环境:

Spark: 2.4.5

Flink: 1.12.5

Hive: 1.2.1

Hudi: 0.9.0


一、Linux 准备

创建 flink 账户,并切换到该账户。添加环境变量:

    export HADOOP_CLASSPATH=`hadoop classpath`


    二、编译 hudi 0.9.0版本

    • 下载 hudi 源码和打补丁

      -- 克隆 hudi 源码
      git clone -b release-0.9.0 --depth 1 https://github.com/apache/hudi.git hudi-0.9-cherry-pick
      -- 移动到项目目录
      cd hudi-0.9-cherry-pick
      -- 打上补丁
      git fetch origin pull/3519/head:cherry-pick
      -- 解决关闭 metastore client 报错,需要同时打上这个 PR:
      https://github.com/apache/hudi/issues/3848

      • 编译 hudi

          1. Windows编译的注释掉集成测试模块

        <module>hudi-integ-test</module>
        <module>packaging/hudi-integ-test-bundle</module>

            2. 修改 packaging 文件夹下 hudi-flink-bundle 模块的 pom 文件,将 hive 版本修改为自己集群对应的版本。


            如果是 hive1 版本就修改 flink-bundle-shade-hive1 profile 下的,hive2 版本则修改 flink-bundle-shade-hive2 profile下的,hive3 类似


            由于我的集群 hive 是 hive1.2.1 的版本,所以修改如下:

          <profile>
          <id>flink-bundle-shade-hive1</id>
          <properties>
          <hive.version>1.2.1</hive.version>
          <thrift.version>0.9.2</thrift.version>
          <flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
          </properties>
          </profile>

             3. hive1版本需要将集群的 hive-site.xml 拷贝至 hudi-flink 模块的 resources 文件夹下面。

              原因是hive1版本的 HiveConf 类加载 hive-site.xml 是会从项目的根路径下寻找。下面是相关源码:

            static {
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            if (classLoader == null) {
            classLoader = HiveConf.class.getClassLoader();
                }
                hiveDefaultURL = classLoader.getResource("hive-default.xml");
            // Look for hive-site.xml on the CLASSPATH and log its location if found.
            hiveSiteURL = classLoader.getResource("hive-site.xml");
            hivemetastoreSiteUrl = classLoader.getResource("hivemetastore-site.xml");
                hiveServer2SiteUrl = classLoader.getResource("hiveserver2-site.xml");
            for (ConfVars confVar : ConfVars.values()) {
            vars.put(confVar.varname, confVar);
            }
            }


                4. maven 编译:命令如下,其中 flink-bundle-shade-hive1 如果是hive2版本则改为 flink-bundle-shade-hive2

              mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive1 -Dspark2


              三、flink 和 kafka 环境准备

              • 下载 flink 1.12.5 编译好的包,hudi 0.9.0 版本对应 flink 1.12.5 版本

                  下载地址:

              https://flink.apache.org/downloads.html#update-policy-for-old-releases

                  找到对应版本:

              https://archive.apache.org/dist/flink/flink-1.12.5/

                  下载命令:

                wget https://archive.apache.org/dist/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.11.tgz


                •  解压文件:

                  tar -zxvf flink-1.12.5-bin-scala_2.11.tgz


                  • 修改 standalone 集群配置

                      1、修改 flink-conf.yaml 文件

                    -- 修改 slot 数,防止不够启动
                    taskmanager.numberOfTaskSlots: 8




                    -- 修改 checkpoint 相关参数
                    state.backend: rocksdb
                    state.checkpoints.dir: hdfs:///tmp/flink/checkpoints/test-flink/
                    state.savepoints.dir: hdfs:///tmp/flink/checkpoints/test-flink/
                    state.backend.incremental: true
                    state.checkpoint-storage: filesystem




                    execution.checkpointing.interval: 15s
                    execution.checkpointing.mode: EXACTLY_ONCE
                    classloader.resolve-order: parent-first




                    restart-strategy.failure-rate.max-failures-per-interval: 30
                    restart-strategy.failure-rate.failure-rate-interval: 1 min
                    restart-strategy.failure-rate.delay: 30 s

                        2、修改 sql-client-defaults.yaml 文件

                      -- hive 
                      catalogs:
                      - name: myhive
                      type: hive
                      hive-conf-dir: /etc/hive/conf(集群具体的路径可能不一样)
                              default-database: flink_catalog
                      execution:
                      planner: blink
                      type: streaming
                      time-characteristic: event-time
                      periodic-watermarks-interval: 200
                      result-mode: table
                      max-table-result-rows: 1000000
                      max-parallelism: 128
                      min-idle-state-retention: 0
                      max-idle-state-retention: 0
                      current-catalog: myhive
                      current-database: flink_catalog


                      •  创建 checkpoint 路径

                        hdfs dfs -mkdir hdfs:///tmp/flink/checkpoints/test-flink


                        • 创建 flink_catalog 库存放 flink 元数据表

                          create database flink_catalog;


                          • 上传 hudi 编译好的 hudi-flink-bundle_2.11-0.9.0.jar 到 lib 目录下

                              该 jar 包位于 packaging 下 hudi-flink-bundle 模块的 target 目录下


                          • 下载 kafka connector 和 hive 依赖的包到 lib 目录下

                              1. 下载 flink-sql-connector-hive-1.2.2_2.11-1.12.5.jar

                              下载地址:

                          https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/hive/

                              下载对应的 flink 版本,下载命令:

                            wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.12.5/flink-sql-connector-hive-1.2.2_2.11-1.12.5.jar


                                2. 下载 flink-sql-connector-kafka_2.11-1.12.5.jar

                                下载地址:

                            https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html

                                下载对应的 flink 版本,下载命令:

                              wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.5/flink-sql-connector-kafka_2.11-1.12.5.jar


                              • kafka 创建topic:

                                  flink_on_hudi_cow_test


                              四、启动 standalone 集群

                                ./bin/start-cluster.sh


                                五、启动 sql-client

                                  ./bin/sql-client.sh embedded

                                  进入 sql 客户端后,输入以下命令:

                                  1.  创建 kafka source 表

                                    CREATE TABLE flink_catalog.kafka_source_test(
                                    user_id STRING,
                                    order_amount BIGINT,
                                    log_ts TIMESTAMP(3),
                                    _hoodie_is_deleted boolean,
                                    part STRING
                                    )WITH(
                                    'connector' = 'kafka',
                                    'topic' = 'flink_on_hudi_cow_test',
                                    'properties.bootstrap.servers' = 'xxx', (填写 kafka 地址)
                                    'scan.startup.mode'='latest-offset',
                                    'properties.group.id' = 'test',
                                    'format' = 'json'
                                    );


                                    2.  创建 hudi sink 表

                                      CREATE TABLE flink_catalog.hudi_test(
                                      user_id VARCHAR(20),
                                      order_amount BIGINT,
                                      log_ts TIMESTAMP(3),
                                      _hoodie_is_deleted boolean,
                                      `part` VARCHAR(20)
                                      )
                                      PARTITIONED BY (`part`)
                                      WITH (
                                      'connector' = 'hudi',
                                      'path' = 'hdfs:///apps/hive/warehouse/test.db/hudi_test', (hudi 写文件的路径)
                                      'table.type' = 'COPY_ON_WRITE',
                                      'write.precombine.field' = 'log_ts', (hudi 相同主键合并时根据该列排序取最新的记录)
                                      'hoodie.datasource.write.recordkey.field' = 'user_id', (hudi 主键)
                                      'write.bucket_assign.tasks' = '2',
                                      'write.tasks' = '2',
                                      'hive_sync.enable' = 'true', (开启 hive 同步)
                                        'hive_sync.mode' = 'hms', (hive 同步的模式,有 hms 和 jdbc 的模式,具体可以看 hudi 官网配置)
                                      'hive_sync.db'='test', (hive 同步的库名)
                                      'hive_sync.table'='hudi_test', (hive 同步的表名)
                                      'hive_sync.metastore.uris' = 'thrift://xxx'(填写 hive metastore 地址)
                                      );


                                      3. 消费 kafka 写入 hudi

                                        insert into flink_catalog.hudi_test select * from flink_catalog.kafka_source_test;


                                        六、往 kafka 生产数据

                                          ./kafka-console-producer.sh --broker-list xxx --topic flink_on_hudi_cow_test

                                          数据样例:

                                            {"user_id":"a1","order_amount":113.0,"log_ts":"2020-06-30 12:12:12","_hoodie_is_deleted":"false","part":"par1"}
                                            {"user_id":"a2","order_amount":113.0,"log_ts":"2020-06-30 12:12:12","_hoodie_is_deleted":"false","part":"par1"}
                                            {"user_id":"a2","order_amount":113.0,"log_ts":"2020-06-30 12:12:12","_hoodie_is_deleted":"true","part":"par1"}
                                            {"user_id":"a3","order_amount":160.0,"log_ts":"2020-06-30 12:22:16","_hoodie_is_deleted":"false","part":"par2"}
                                            {"user_id":"a5","order_amount":160.0,"log_ts":"2020-06-30 12:22:16","_hoodie_is_deleted":"false","part":"par3"}
                                            {"user_id":"a8","order_amount":260.0,"log_ts":"2020-06-30 13:22:16","_hoodie_is_deleted":"false","part":"par1"}

                                            其中 _hoodie_is_deleted 字段是 hudi 判断是否删除的标志,具体判断逻辑在 hudi 的 payload 类里面。true 为删除,false 为不是删除操作


                                            上面 a2 有两条记录,第二条的 _hoodie_is_deleted 为 true,所以最终 a2 是不存在的,被删除了。


                                            接下来如果顺利的话,十几秒过后就会把 kafka 的记录写到 hudi 表。


                                            为什么是十几秒呢?原因是 hudi 在 flink 触发 checkpoint 后才落盘写 parquet 文件。而刚才我们 standalone 集群设置的 checkpoint 间隔为15s:

                                            execution.checkpointing.interval: 15s


                                            在 jobmanager 能看到如下日志:

                                              2022-02-14 11:15:18,987 INFO  org.apache.hudi.hive.HiveSyncTool                            [] - Hive table hudi_test is not found. Creating it
                                              2022-02-14 11:15:19,672 INFO  org.apache.hudi.hive.HiveSyncTool                            [] - Schema sync complete. Syncing partitions for hudi_test
                                              2022-02-14 11:15:19,672 INFO  org.apache.hudi.hive.HiveSyncTool                            [] - Last commit time synced was found to be null
                                              2022-02-14 11:15:19,672 INFO  org.apache.hudi.sync.common.AbstractSyncHoodieClient         [] - Last commit time synced is not known, listing all partitions in hdfs:///apps/hive/warehouse/test.db/hudi_test,FS :DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1401082153_30, ugi=flink (auth:SIMPLE)]]
                                              2022-02-14 11:15:19,685 INFO  org.apache.hudi.hive.HiveSyncTool                            [] - Storage partitions scan complete. Found 3
                                              2022-02-14 11:15:19,706 INFO  org.apache.hudi.hive.HiveSyncTool                            [] - New Partitions [par1, par2, par3]
                                              2022-02-14 11:15:19,706 INFO  org.apache.hudi.hive.ddl.HMSDDLExecutor                      [] - Adding partitions 3 to table hudi_test
                                              2022-02-14 11:15:19,782 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 3 (type=CHECKPOINT) @ 1644808519775 for job cdbc1b70c3e6a1d337e80365351a72b8.
                                              2022-02-14 11:15:19,783 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [taking checkpoint 3success!
                                              2022-02-14 11:15:19,805 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [handle write metadata event for instant 20220214111517success!
                                              2022-02-14 11:15:19,807 INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [handle write metadata event for instant 20220214111517success!
                                              2022-02-14 11:15:19,846 INFO  org.apache.hudi.hive.HiveSyncTool                            [] - Changed Partitions []
                                              2022-02-14 11:15:19,846 INFO  org.apache.hudi.hive.ddl.HMSDDLExecutor                      [] - No partitions to change for hudi_test
                                              2022-02-14 11:15:19,900 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for hudi_test

                                              从日志可以看出,由于我们开启了 hive 同步,hudi 判断表是否存在,如果不存在会帮我们创建表


                                              七、Spark 查询测试

                                              Spark 查询需要 另一个编译好的 jar 包:

                                              hudi-utilities-bundle_2.11-0.9.0.jar。该包在 packaging 文件夹下的 hudi-utilities-bundle 模块的 target 目录下

                                              执行命令:

                                                spark-shell --master yarn --queue  test --num-executors 2 --executor-cores 2 --executor-memory 6g  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.hive.convertMetastoreParquet=false' --jars hudi-utilities-bundle_2.11-0.9.0.jar
                                                  scala> spark.sql("select * from test.hudi_test limit 10").show(10,false)


                                                  至此,flink on hudi 实时入湖的流程就测完了


                                                  文章转载自半夏大师兄,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                  评论