暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

实战 | 将Kafka流式数据摄取至Hudi

ApacheHudi 2021-04-20
2717

1. 引入

Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能

  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启--enable-hive-sync 即可同步数据到hive表。

2. 步骤

2.1 DeltaStreamer启动命令

  1. spark-submit --master yarn \

  2. --driver-memory 1G \

  3. --num-executors 2 \

  4. --executor-memory 1G \

  5. --executor-cores 4 \

  6. --deploy-mode cluster \

  7. --conf spark.yarn.executor.memoryOverhead=512 \

  8. --conf spark.yarn.driver.memoryOverhead=512 \

  9. --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls .../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \

  10. --props hdfs://../kafka.properties \

  11. --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \

  12. --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \

  13. --target-base-path hdfs://../business \

  14. --op UPSERT \

  15. --target-table business \ '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'

  16. --enable-hive-sync \ '开启同步至hive'

  17. --table-type MERGE_ON_READ \

  18. --source-ordering-field create_time \

  19. --source-limit 5000000

2.2 kafka.properties配置实例

  1. hoodie.upsert.shuffle.parallelism=2

  2. hoodie.insert.shuffle.parallelism=2

  3. hoodie.bulkinsert.shuffle.parallelism=2

  4. hoodie.datasource.write.recordkey.field=uuid

  5. hoodie.datasource.write.partitionpath.field=create_time

  6. hoodie.datasource.write.precombine.field=update_time

  7. hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc

  8. hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc

  9. hoodie.deltastreamer.source.kafka.topic=t_business_topic

  10. group.id=t_business_group

  11. bootstrap.servers=localhost

  12. auto.offset.reset=latest

  13. hoodie.parquet.max.file.size=134217728

  14. hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator

  15. hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING

  16. hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss

  17. hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd

  18. hoodie.datasource.hive_sync.database=dwd

  19. hoodie.datasource.hive_sync.table=test

  20. hoodie.datasource.hive_sync.username=用户名

  21. hoodie.datasource.hive_sync.password=密码

  22. hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....

  23. hoodie.datasource.hive_sync.partition_fields=分区字段

3. 不同模式

3.1 MOR模式

如果使用MOR模式写入数据会在Hive的dwd库下面生成两张表。分别是testro 和 testrt testrt表支持:快照视图和增量视图查询 testro表支持:读优化视图查询

3.1.1 使用Spark查询

  1. spark-shell --master yarn \

  2. --driver-memory 1G \

  3. --num-executors 1 \

  4. --executor-memory 1G \

  5. --executor-cores 1 \

  6. --jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \

  7. --conf spark.sql.hive.convertMetastoreParquet=false '在进行快照视图查询的时候需要添加此配置'


  8. #快照视图

  9. spark.sql("select count(*) from dwd.test_rt").show()

  10. #读优化视图

  11. spark.sql("select count(*) from dwd.test_ro").show()

  12. #增量视图

  13. saprk sql不支持

3.1.2 使用Hive查询

  1. beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx -p t3cx \

  2. --hiveconf hive.stats.autogather=false \


  3. #读优化查询

  4. select * from dwd.test_ro;

  5. #快照查询

  6. select * from dwd.test_rt;

  7. #增量查询

  8. set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

  9. set hoodie.test.consume.mode=INCREMENTAL;

  10. set hoodie.test.consume.max.commits=3;

  11. set hoodie.test.consume.start.timestamp=20200427114546;

  12. select count(*) from dwd.test_rt where `_hoodie_commit_time` > '20200427114546';


  13. #注意:

  14. #1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,clouder用户需要必须要重新安装mr所需要的jar

  15. #2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

3.2 COW模式

如果使用COW模式写入数据,会在Hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图

3.2.1 使用Spark查询

  1. spark-shell --master yarn \

  2. --driver-memory 1G \

  3. --num-executors 1 \

  4. --executor-memory 1G \

  5. --executor-cores 1 \

  6. --jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \

  7. --conf spark.sql.hive.convertMetastoreParquet=false


  8. #快照视图

  9. spark.sql("select count(*) from dwd.test").show()

  1. //增量视图 无需遍历全部数据,即可获取时间大于20200426140637的数据

  2. import org.apache.hudi.DataSourceReadOptions

  3. val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")

  4. spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()

3.2.2 使用Hive查询

  1. beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx -p t3cx \

  2. --hiveconf hive.stats.autogather=false \


  3. #快照查询

  4. select count(*) from dwd.test;

  5. #增量查询

  6. set hoodie.test.consume.mode=INCREMENTAL;

  7. set hoodie.test.consume.max.commits=3;

  8. set hoodie.test.consume.start.timestamp=20200427114546;

  9. select count(*) from dwd.test where `_hoodie_commit_time` > '20200427114546';

4. 总结

DeltaStreamer是Hudi提供的非常实用的工具,通过DeltaStreamer可以将Kafka、DFS上的数据导入Hudi,而本篇博文主要讲解了如何使用DeltaStreamer将数据从Kafka导入Hudi,并演示了如何使用Spark和Hive查询Hudi数据。


文章转载自ApacheHudi,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论