暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache Hudi入门指南(含代码示例)

ApacheHudi 2021-04-20
4228

1. 什么是Apache Hudi

一个spark 库 大数据更新解决方案,大数据中没有传统意义的更新,只有append和重写(Hudi就是采用重写方式)

使用Hudi的优点

  • 使用Bloomfilter机制+二次查找,可快速确定记录是更新还是新增

  • 更新范围小,是文件级别,不是表级别

  • 文件大小与hdfs的Blocksize保持一致

  • 数据文件使用parquet格式,充分利用列存的优势(dremal论文实现

  • 提供了可扩展的大数据更新框架

  • 并发度由spark控制

hudi详细介绍见hudi官网 http://hudi.apache.org/cn/docs/0.5.0-quick-start-guide.html

2. Hudi编译

  1. git clone https://github.com/apache/incubator-hudi.git && cd incubator-hudi

  2. mvn clean package -DskipTests -DskipITs

注意: 本文编译hudi使用的linux环境,window环境一定要加上 -DskipITs
,不然会编译docker文件启动服务运行linux命令导致报错,如果是linux环境且需要用docker进行测试可以考虑去掉其参数。

3. 前置环境安装准备

所有版本选择均是查看当前master分支pom 中所依赖的 spark,hive ,hadoop,presto版本。(hudi-0.5.2-SNAPSHOT)

版本链接地址
hadoop 2.7.3https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3-src.tar.gz
spark 2.4.4https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
hive 2.3.1http://archive.apache.org/dist/hive/hive-2.3.1/apache-hive-2.3.1-bin.tar.gz
presto 0.217https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.217/presto-server-0.217.tar.gz
presto-cli-0.217-executable.jarhttps://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.217/presto-cli-0.217-executable.jar

注意:小版本不一样不影响使用,如果运行spark任务报错不兼容排下依赖包就好。

4. Hive和Presto集成

4.1 hive

hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。hive 外表数据结构如下:

  1. CREATE EXTERNAL TABLE `test_partition`(

  2. `_hoodie_commit_time` string,

  3. `_hoodie_commit_seqno` string,

  4. `_hoodie_record_key` string,

  5. `_hoodie_file_name` string,

  6. `id` string,

  7. `oid` string,

  8. `name` string,

  9. `dt` string,

  10. `isdeleted` string,

  11. `lastupdatedttm` string,

  12. `rowkey` string)

  13. PARTITIONED BY (

  14. `_hoodie_partition_path` string)

  15. ROW FORMAT SERDE

  16. 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'

  17. STORED AS INPUTFORMAT

  18. 'org.apache.hudi.hadoop.HoodieParquetInputFormat'

  19. OUTPUTFORMAT

  20. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'

  21. LOCATION

  22. 'hdfs://hj:9000/tmp/hudi'

  23. TBLPROPERTIES (

  24. 'transient_lastDdlTime'='1582111004')

hive集成hudi方法:将hudi jar复制到hive lib下

  1. cp ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar $HIVE_HOME/lib

4.2 Presto

presto 集成hudi 是基于hive catalog 同样是访问hive 外表进行查询,如果要集成需要把hudi 包copy 到presto hive-hadoop2插件下面。

presto集成hudi方法: 将hudi jar复制到 presto hive-hadoop2下

  1. cp ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar $PRESTO_HOME/plugin/hive-hadoop2/

5. Hudi代码实战

5.1 CopyonWrite 模式操作(默认模式)

5.1.1 insert操作(初始化插入数据)

  1. // 不带分区写入

  2. @Test

  3. def insert(): Unit = {

  4. val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  5. val insertData = spark.read.parquet("/tmp/1563959377698.parquet")

  6. insertData.write.format("org.apache.hudi")

  7. // 设置主键列名

  8. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  9. // 设置数据更新时间的列名

  10. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  11. // 并行度参数设置

  12. .option("hoodie.insert.shuffle.parallelism", "2")

  13. .option("hoodie.upsert.shuffle.parallelism", "2")

  14. // table name 设置

  15. .option(HoodieWriteConfig.TABLE_NAME, "test")

  16. .mode(SaveMode.Overwrite)

  17. // 写入路径设置

  18. .save("/tmp/hudi")

  19. }


  20. // 带分区写入

  21. @Test

  22. def insertPartition(): Unit = {

  23. val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  24. // 读取文本文件转换为df

  25. val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")

  26. insertData.write.format("org.apache.hudi")

  27. // 设置主键列名

  28. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  29. // 设置数据更新时间的列名

  30. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  31. // 设置分区列

  32. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")

  33. // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM

  34. .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")

  35. // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引

  36. .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())

  37. // 并行度参数设置

  38. .option("hoodie.insert.shuffle.parallelism", "2")

  39. .option("hoodie.upsert.shuffle.parallelism", "2")

  40. .option(HoodieWriteConfig.TABLE_NAME, "test_partition")

  41. .mode(SaveMode.Overwrite)

  42. .save("/tmp/hudi")

  43. }

5.1.2 upsert操作(数据存在时修改,不存在时新增)

  1. // 不带分区upsert

  2. @Test

  3. def upsert(): Unit = {


  4. val spark = SparkSession.builder.appName("hudi upsert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  5. val insertData = spark.read.parquet("/tmp/1563959377699.parquet")


  6. insertData.write.format("org.apache.hudi")

  7. // 设置主键列名

  8. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  9. // 设置数据更新时间的列名

  10. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  11. // 表名称设置

  12. .option(HoodieWriteConfig.TABLE_NAME, "test")

  13. // 并行度参数设置

  14. .option("hoodie.insert.shuffle.parallelism", "2")

  15. .option("hoodie.upsert.shuffle.parallelism", "2")

  16. .mode(SaveMode.Append)

  17. // 写入路径设置

  18. .save("/tmp/hudi");

  19. }


  20. // 带分区upsert

  21. @Test

  22. def upsertPartition(): Unit = {


  23. val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  24. val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_update_data.txt")


  25. upsertData.write.format("org.apache.hudi").option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  26. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  27. // 分区列设置

  28. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")

  29. .option(HoodieWriteConfig.TABLE_NAME, "test_partition")

  30. .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())

  31. .option("hoodie.insert.shuffle.parallelism", "2")

  32. .option("hoodie.upsert.shuffle.parallelism", "2")

  33. .mode(SaveMode.Append)

  34. .save("/tmp/hudi");

  35. }

5.1.3 delete操作(删除数据)

  1. @Test

  2. def delete(): Unit = {

  3. val spark = SparkSession.builder.appName("delta insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  4. val deleteData = spark.read.parquet("/tmp/1563959377698.parquet")

  5. deleteData.write.format("com.uber.hoodie")

  6. // 设置主键列名

  7. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  8. // 设置数据更新时间的列名

  9. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  10. // 表名称设置

  11. .option(HoodieWriteConfig.TABLE_NAME, "test")

  12. // 硬删除配置

  13. .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")

  14. }

删除操作分为软删除和硬删除配置在这里查看:http://hudi.apache.org/cn/docs/0.5.0-writing_data.html#%E5%88%A0%E9%99%A4%E6%95%B0%E6%8D%AE

5.1.4 query操作(查询数据)

  1. @Test

  2. def query(): Unit = {

  3. val basePath = "/tmp/hudi"

  4. val spark = SparkSession.builder.appName("query insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  5. val tripsSnapshotDF = spark.

  6. read.

  7. format("org.apache.hudi").

  8. load(basePath + "/*/*")


  9. tripsSnapshotDF.show()

  10. }

5.1.5 同步至Hive

  1. @Test

  2. def hiveSync(): Unit = {

  3. val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  4. val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")


  5. upsertData.write.format("org.apache.hudi")

  6. // 设置主键列名

  7. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  8. // 设置数据更新时间的列名

  9. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  10. // 分区列设置

  11. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")

  12. // 设置要同步的hive库名

  13. .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")

  14. // 设置要同步的hive表名

  15. .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")

  16. // 设置数据集注册并同步到hive

  17. .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")

  18. // 设置当分区变更时,当前数据的分区目录是否变更

  19. .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")

  20. // 设置要同步的分区列名

  21. .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")

  22. // 设置jdbc 连接同步

  23. .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")

  24. // hudi表名称设置

  25. .option(HoodieWriteConfig.TABLE_NAME, "test_partition")

  26. // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步

  27. .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")

  28. // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM

  29. .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())

  30. // 并行度参数设置

  31. .option("hoodie.insert.shuffle.parallelism", "2")

  32. .option("hoodie.upsert.shuffle.parallelism", "2")

  33. .mode(SaveMode.Append)

  34. .save("/tmp/hudi");

  35. }


  36. @Test

  37. def hiveSyncMergeOnReadByUtil(): Unit = {

  38. val args: Array[String] = Array("--jdbc-url", "jdbc:hive2://hj:10000", "--partition-value-extractor", "org.apache.hudi.hive.MultiPartKeysValueExtractor", "--user", "hive", "--pass", "hive", "--partitioned-by", "dt", "--base-path", "/tmp/hudi_merge_on_read", "--database", "hj_repl", "--table", "test_partition_merge_on_read")

  39. HiveSyncTool.main(args)

  40. }

这里可以选择使用spark 或者hudi-hive包中的hiveSynTool进行同步,hiveSynTool类其实就是runsynctool.sh运行时调用的。hudi 和hive同步时保证hive目标表不存在,同步其实就是建立外表的过程。

5.1.6 Hive查询读优化视图和增量视图

  1. @Test

  2. def hiveViewRead(): Unit = {

  3. // 目标表

  4. val sourceTable = "test_partition"

  5. // 增量视图开始时间点

  6. val fromCommitTime = "20200220094506"

  7. // 获取当前增量视图后几个提交批次

  8. val maxCommits = "2"


  9. Class.forName("org.apache.hive.jdbc.HiveDriver")

  10. val prop = new Properties()

  11. prop.put("user", "hive")

  12. prop.put("password", "hive")

  13. val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)

  14. val stmt = conn.createStatement

  15. // 这里设置增量视图参数

  16. stmt.execute("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat")

  17. // Allow queries without partition predicate

  18. stmt.execute("set hive.strict.checks.large.query=false")

  19. // Dont gather stats for the table created

  20. stmt.execute("set hive.stats.autogather=false")

  21. // Set the hoodie modie

  22. stmt.execute("set hoodie." + sourceTable + ".consume.mode=INCREMENTAL")

  23. // Set the from commit time

  24. stmt.execute("set hoodie." + sourceTable + ".consume.start.timestamp=" + fromCommitTime)

  25. // Set number of commits to pull

  26. stmt.execute("set hoodie." + sourceTable + ".consume.max.commits=" + maxCommits)


  27. val rs = stmt.executeQuery("select * from " + sourceTable)

  28. val metaData = rs.getMetaData

  29. val count = metaData.getColumnCount



  30. while (rs.next()) {

  31. for (i <- 1 to count) {

  32. println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)

  33. }

  34. println("-----------------------------------------------------------")

  35. }


  36. rs.close()

  37. stmt.close()

  38. conn.close()


  39. }

读优化视图即去掉增量视图参数即可。

5.1.7 Presto查询读优化视图(暂不支持增量视图)

  1. @Test

  2. def prestoViewRead(): Unit = {

  3. // 目标表

  4. val sourceTable = "test_partition"

  5. Class.forName("com.facebook.presto.jdbc.PrestoDriver")

  6. val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)

  7. val stmt = conn.createStatement

  8. val rs = stmt.executeQuery("select * from " + sourceTable)

  9. val metaData = rs.getMetaData

  10. val count = metaData.getColumnCount


  11. while (rs.next()) {

  12. for (i <- 1 to count) {

  13. println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)

  14. }

  15. println("-----------------------------------------------------------")

  16. }


  17. rs.close()

  18. stmt.close()

  19. conn.close()

  20. }

5.2 MergeOnRead 模式操作

5.2.1 insert操作(插入数据)

  1. @Test

  2. def insertPartitionMergeOnRead(): Unit = {

  3. val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  4. // 读取文本文件转换为df

  5. val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")

  6. insertData.write.format("org.apache.hudi")

  7. .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)

  8. // 设置主键列名

  9. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  10. // 设置数据更新时间的列名

  11. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  12. // 设置分区列

  13. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")

  14. // 设置当分区变更时,当前数据的分区目录是否变更

  15. .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")

  16. // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM

  17. .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())

  18. // 并行度参数设置

  19. .option("hoodie.insert.shuffle.parallelism", "2")

  20. .option("hoodie.upsert.shuffle.parallelism", "2")

  21. .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")

  22. .mode(SaveMode.Overwrite)

  23. .save("/tmp/hudi_merge_on_read")

  24. }

merge on read 主要是要是加入option(DataSourceWriteOptions.TABLETYPEOPTKEY, DataSourceWriteOptions.MORTABLETYPEOPT_VAL)参数,其他修改删除操作和copy on write 类似,这里不一一列举。

5.2.2 同步至Hive

  1. @Test

  2. def hiveSyncMergeOnRead(): Unit = {

  3. val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()

  4. val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")


  5. upsertData.write.format("org.apache.hudi")

  6. // 配置读时合并

  7. .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)

  8. // 设置主键列名

  9. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")

  10. // 设置数据更新时间的列名

  11. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")

  12. // 分区列设置

  13. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")

  14. // 设置要同步的hive库名

  15. .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")

  16. // 设置要同步的hive表名

  17. .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition_merge_on_read")

  18. // 设置数据集注册并同步到hive

  19. .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")

  20. // 设置当分区变更时,当前数据的分区目录是否变更

  21. .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")

  22. // 设置要同步的分区列名

  23. .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")

  24. // 设置jdbc 连接同步

  25. .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")

  26. // hudi表名称设置

  27. .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")

  28. // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步

  29. .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")

  30. // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM

  31. .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())

  32. // 并行度参数设置

  33. .option("hoodie.insert.shuffle.parallelism", "2")

  34. .option("hoodie.upsert.shuffle.parallelism", "2")

  35. .mode(SaveMode.Append)

  36. .save("/tmp/hudi_merge_on_read");

  37. }

与copy on write 操作一样,不同的是merge on read 会生成两个表后缀为ro和rt的外表。ro为读优化视图,rt为实时视图。

5.2.3 Hive查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)

  1. /**

  2. * merge on read 实时视图查询

  3. */

  4. @Test

  5. def mergeOnReadRealtimeViewByHive(): Unit = {

  6. // 目标表

  7. val sourceTable = "test_partition_merge_on_read_rt"


  8. Class.forName("org.apache.hive.jdbc.HiveDriver")

  9. val prop = new Properties()

  10. prop.put("user", "hive")

  11. prop.put("password", "hive")

  12. val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)

  13. val stmt = conn.createStatement


  14. val rs = stmt.executeQuery("select * from " + sourceTable)

  15. val metaData = rs.getMetaData

  16. val count = metaData.getColumnCount



  17. while (rs.next()) {

  18. for (i <- 1 to count) {

  19. println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)

  20. }

  21. println("-----------------------------------------------------------")

  22. }


  23. rs.close()

  24. stmt.close()

  25. conn.close()

  26. }



  27. /**

  28. * merge on read 读优化视图查询

  29. */

  30. @Test

  31. def mergeOnReadReadoptimizedViewByHive(): Unit = {

  32. // 目标表

  33. val sourceTable = "test_partition_merge_on_read_ro"


  34. Class.forName("org.apache.hive.jdbc.HiveDriver")

  35. val prop = new Properties()

  36. prop.put("user", "hive")

  37. prop.put("password", "hive")

  38. val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)

  39. val stmt = conn.createStatement


  40. val rs = stmt.executeQuery("select * from " + sourceTable)

  41. val metaData = rs.getMetaData

  42. val count = metaData.getColumnCount



  43. while (rs.next()) {

  44. for (i <- 1 to count) {

  45. println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)

  46. }

  47. println("-----------------------------------------------------------")

  48. }


  49. rs.close()

  50. stmt.close()

  51. conn.close()

  52. }

5.2.4 Presto查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)

  1. /**

  2. * presto merge on read 实时视图查询

  3. */

  4. @Test

  5. def mergeOnReadRealtimeViewByPresto(): Unit = {

  6. // 目标表

  7. val sourceTable = "test_partition_merge_on_read_rt"

  8. Class.forName("com.facebook.presto.jdbc.PrestoDriver")

  9. val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)

  10. val stmt = conn.createStatement

  11. val rs = stmt.executeQuery("select * from " + sourceTable)

  12. val metaData = rs.getMetaData

  13. val count = metaData.getColumnCount


  14. while (rs.next()) {

  15. for (i <- 1 to count) {

  16. println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)

  17. }

  18. println("-----------------------------------------------------------")

  19. }


  20. rs.close()

  21. stmt.close()

  22. conn.close()

  23. }



  24. /**

  25. * presto merge on read 读优化视图查询

  26. */

  27. @Test

  28. def mergeOnReadReadoptimizedViewByPresto(): Unit = {

  29. // 目标表

  30. val sourceTable = "test_partition_merge_on_read_ro"

  31. Class.forName("com.facebook.presto.jdbc.PrestoDriver")

  32. val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)

  33. val stmt = conn.createStatement

  34. val rs = stmt.executeQuery("select * from " + sourceTable)

  35. val metaData = rs.getMetaData

  36. val count = metaData.getColumnCount


  37. while (rs.next()) {

  38. for (i <- 1 to count) {

  39. println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)

  40. }

  41. println("-----------------------------------------------------------")

  42. }


  43. rs.close()

  44. stmt.close()

  45. conn.close()

  46. }

6. 问题整理

1. Merge on Read问题

merge on read 要配置option(DataSourceWriteOptions.TABLETYPEOPTKEY, DataSourceWriteOptions.MORTABLETYPEOPTVAL)才会生效,配置为option(HoodieTableConfig.HOODIETABLETYPEPROPNAME, HoodieTableType.MERGEON_READ.name())将不会生效。

2. spark pom依赖问题

不要引入spark-hive 的依赖里面包含了hive 1.2.1的相关jar包,而hudi 要求的版本是2.x版本。如果一定要使用请排除相关依赖。

3. hive视图同步问题

代码与hive视图同步时resources要加入hive-site.xml 配置文件,不然同步hive metastore 会报错。

git 测试代码地址:https://github.com/hj2016/hudi-test


文章转载自ApacheHudi,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论