前言
总计如何利用Hudi DeltaStreamer工具从外部数据源读取数据并写入新的Hudi表,HoodieDeltaStreamer
是hudi-utilities-bundle的一部分,按照Apache Hudi 入门学习总结,将hudi-spark-bundle包拷贝至$SPARK_HOME/jars目录下即可。
HoodieDeltaStreamer
提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件 增量导入
支持json、avro或自定义记录类型的传入数据
管理检查点,回滚和恢复
利用DFS或Confluent schema注册表的Avro模式。
支持自定义转换操作
除了上述官网说的几项,也支持读取Hive表等(历史数据)转化Hudi表,源码里还有其他的工具类,可以自行查阅源码发掘
命令行选项更详细地描述了这些功能:
1spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --help
2
3Options:
4 --master MASTER_URL spark://host:port, mesos://host:port, yarn,
5 k8s://https://host:port, or local (Default: local[*]).
6 --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
7 on one of the worker machines inside the cluster ("cluster")
8 (Default: client).
9 --class CLASS_NAME Your application's main class (for Java / Scala apps).
10 --name NAME A name of your application.
11 --jars JARS Comma-separated list of jars to include on the driver
12 and executor classpaths.
13 --packages Comma-separated list of maven coordinates of jars to include
14 on the driver and executor classpaths. Will search the local
15 maven repo, then maven central and any additional remote
16 repositories given by --repositories. The format for the
17 coordinates should be groupId:artifactId:version.
18 --exclude-packages Comma-separated list of groupId:artifactId, to exclude while
19 resolving the dependencies provided in --packages to avoid
20 dependency conflicts.
21 --repositories Comma-separated list of additional remote repositories to
22 search for the maven coordinates given with --packages.
23 --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
24 on the PYTHONPATH for Python apps.
25 --files FILES Comma-separated list of files to be placed in the working
26 directory of each executor. File paths of these files
27 in executors can be accessed via SparkFiles.get(fileName).
28
29 --conf PROP=VALUE Arbitrary Spark configuration property.
30 --properties-file FILE Path to a file from which to load extra properties. If not
31 specified, this will look for conf/spark-defaults.conf.
32
33 --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
34 --driver-java-options Extra Java options to pass to the driver.
35 --driver-library-path Extra library path entries to pass to the driver.
36 --driver-class-path Extra class path entries to pass to the driver. Note that
37 jars added with --jars are automatically included in the
38 classpath.
39
40 --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
41
42 --proxy-user NAME User to impersonate when submitting the application.
43 This argument does not work with --principal / --keytab.
44
45 --help, -h Show this help message and exit.
46 --verbose, -v Print additional debug output.
47 --version, Print the version of current Spark.
48
49 Cluster deploy mode only:
50 --driver-cores NUM Number of cores used by the driver, only in cluster mode
51 (Default: 1).
52
53 Spark standalone or Mesos with cluster deploy mode only:
54 --supervise If given, restarts the driver on failure.
55 --kill SUBMISSION_ID If given, kills the driver specified.
56 --status SUBMISSION_ID If given, requests the status of the driver specified.
57
58 Spark standalone and Mesos only:
59 --total-executor-cores NUM Total cores for all executors.
60
61 Spark standalone and YARN only:
62 --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
63 or all available cores on the worker in standalone mode)
64
65 YARN-only:
66 --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
67 --num-executors NUM Number of executors to launch (Default: 2).
68 If dynamic allocation is enabled, the initial number of
69 executors will be at least NUM.
70 --archives ARCHIVES Comma separated list of archives to be extracted into the
71 working directory of each executor.
72 --principal PRINCIPAL Principal to be used to login to KDC, while running on
73 secure HDFS.
74 --keytab KEYTAB The full path to the file that contains the keytab for the
75 principal specified above. This keytab will be copied to
76 the node running the Application Master via the Secure
77 Distributed Cache, for renewing the login tickets and the
78 delegation tokens periodically.
79
最新版本应该支持了更多参数,可以查阅官网:https://hudi.apache.org/cn/docs/hoodie_deltastreamer
Hive设置
ambari设置:hive.resultset.use.unique.column.names=false,并重启
SqlSource
这里利用SqlSource 读取Hive历史表转化为Hudi表,先讲SqlSource的原因是其他几个类型的Source都需要提供表Schema相关的配置,比较麻烦,如JdbcbasedSchemaProvider
需要配置jdbcUrl、user、password、table等或者FilebasedSchemaProvider
需要提供一个Schema文件的地址如/path/source.avsc
,无论是配置jdbc连接信息还是生成avsc文件都比较麻烦,所以想找一个不需要提供Schema的Source,通过搜索源码发现SqlSource可以满足这个需求,但是实际使用起来在0.9.0版本发现了bug,并不能直接使用,好在稍微修改一下对应的源码即可解决。当然还有其他不需要提供Schema的source,如ParquetDFSSource
和CsvDFSSource
,它们和SqlSource
都是RowSource
的子类,但是文件格式有限制,不如SqlSource
通用,SqlSource
只需要是Hive表即可,这也满足我们需要将Hive表转化为Hudi表的需求。
创建Hive历史表
1create database test location '/test';
2create table test.test_source (
3 id int,
4 name string,
5 price double,
6 dt string,
7 ts bigint
8);
9insert into test.test_source values (105,'hudi', 10.0,'2021-05-05',100);
Spark SQL创建Hudi目标表
1create database hudi location '/hudi';
2create table hudi.test_hudi_target (
3 id int,
4 name string,
5 price double,
6 ts long,
7 dt string
8) using hudi
9partitioned by (dt)
10options (
11 primaryKey = 'id',
12 preCombineField = 'ts',
13 type = 'cow'
14);
这里事先用Spark SQL建表是因为虽然用HoodieDeltaStreamer
时配置同步hive参数也可以自动建表,但是某些参数不生效,如hoodie.datasource.hive_sync.create_managed_table
和hoodie.datasource.hive_sync.serde_properties
,在properties配置和通过 --hoodie-conf
配置都不行,通过阅读源码,发现0.9.0版本不支持(应该属于bug),这样不满足我们要建内部表和主键表的需求,所以这里先用Spark SQL建表,再用HoodieDeltaStreamer
转化数据。
最新版本已支持这些参数,PR:https://github.com/apache/hudi/pull/4175
配置文件
common.properties
1hoodie.datasource.write.hive_style_partitioning=true
2hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
3hoodie.datasource.hive_sync.use_jdbc=false
4hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
sql_source.properties
1include=common.properties
2hoodie.datasource.write.recordkey.field=id
3hoodie.datasource.write.partitionpath.field=dt
4# 非分区表配置 hoodie.datasource.write.partitionpath.field=
5hoodie.deltastreamer.source.sql.sql.query = select * from test.test_source
6# 和同步Hive相关的配置
7hoodie.datasource.hive_sync.table=test_hudi_target
8hoodie.datasource.hive_sync.database=hudi
9## 非分区表可以不设置
10hoodie.datasource.hive_sync.partition_fields=dt
11## 内部表,默认外部表,0.9.0版本不支持
12hoodie.datasource.hive_sync.create_managed_table = true
13## 0.9.0版本不支持
14hoodie.datasource.hive_sync.serde_properties = primaryKey=id
命令
1spark-submit --conf "spark.sql.catalogImplementation=hive" \
2--master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 4G --driver-cores 2 \
3--principal spark/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab \
4--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.9.0.jar \
5--props file:///opt/dkl/sql_source.properties \
6--target-base-path /hudi/test_hudi_target \
7--target-table test_hudi_target \
8--op BULK_INSERT \
9--table-type COPY_ON_WRITE \
10--source-ordering-field ts \
11--source-class org.apache.hudi.utilities.sources.SqlSource \
12--enable-sync \
13--checkpoint earliest \
14--hoodie-conf 'hoodie.datasource.hive_sync.create_managed_table = true' \
15--hoodie-conf 'hoodie.datasource.hive_sync.serde_properties = primaryKey=id'
enable-hive-sync和enable-sync都是开启同步Hive的,不过enable-hive-sync已弃用,建议用enable-sync
这里需要加参数spark.sql.catalogImplementation=hive
,因为源码里的Spark默认没有开启支持hive即enableHiveSupport
,而enableHiveSupport的实现就是通过配置spark.sql.catalogImplementation=hive
执行完后,查询目标表,可以发现数据已经从源表抽取到目标表了
如果不加checkpoint(SqlSource从设计上不支持checkpoint,所以原则上不应该使用checkpoint参数),否则会有日志:No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Optional.empty). New Checkpoint=(null) 。这样不会抽取数据
源码解读:
1 if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
2 LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
3 + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
4 return null;
5 }
当checkpointStr和resumeCheckpointStr相同时,则认为没有新的数据,checkpointStr是source里面的checkpoint,resumeCheckpointStr使我们根据配置从target里获取的
1checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch()
checkpointStr 在这里实际调用的是 SqlSource类里的 fetchNextBatch,而他的返回值写死为null return Pair.of(Option.of(source), null);
resumeCheckpointStr的逻辑为当目标表为空时,返回cfg.checkpoint
,具体的代码逻辑:
(这里贴的为master最新代码,因为0.9.0版本的逻辑不如新版的清晰,大致逻辑是一样的)
1 /**
2 * Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from.
3 * @param commitTimelineOpt commit timeline of interest.
4 * @return the checkpoint to resume from if applicable.
5 * @throws IOException
6 */
7 private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
8 Option<String> resumeCheckpointStr = Option.empty();
9 Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
10 if (lastCommit.isPresent()) {
11 // if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
12 Option<HoodieCommitMetadata> commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
13 if (commitMetadataOption.isPresent()) {
14 HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
15 LOG.debug("Checkpoint reset from metadata: " + commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
16 if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
17 || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
18 resumeCheckpointStr = Option.of(cfg.checkpoint);
19 } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
20 //if previous checkpoint is an empty string, skip resume use Option.empty()
21 resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
22 } else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
23 HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
24 throw new HoodieDeltaStreamerException(
25 "Unable to find previous checkpoint. Please double check if this table "
26 + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
27 + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
28 + commitMetadata.toJsonString());
29 }
30 // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
31 if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
32 props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
33 }
34 } else if (cfg.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
35 resumeCheckpointStr = Option.of(cfg.checkpoint);
36 }
37 }
38 return resumeCheckpointStr;
39 }
所以我们加了--checkpoint earliest
,但是这样的话SqlSource默认的只能抽取一次,如果多次抽取或用HoodieDeltaStreamer
其他的增量抽取转化,则会抛异常:
1ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting down
2org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :Option{val=[20220514205049__commit__COMPLETED]}, Instants :[[20220514205049__commit__COMPLETED]], CommitMetadata={
3 "partitionToWriteStats" : {
4 "dt=2021-05-05" : [ {
5 "fileId" : "487e265e-21f2-4830-9c54-e91bdae6e496-0",
6 "path" : "dt=2021-05-05/487e265e-21f2-4830-9c54-e91bdae6e496-0_0-5-5_20220514205049.parquet",
7 "prevCommit" : "null",
8 "numWrites" : 1,
9 "numDeletes" : 0,
10 "numUpdateWrites" : 0,
11 "numInserts" : 1,
12 "totalWriteBytes" : 435208,
13 "totalWriteErrors" : 0,
14 "tempPath" : null,
15 "partitionPath" : "dt=2021-05-05",
16 "totalLogRecords" : 0,
17 "totalLogFilesCompacted" : 0,
18 "totalLogSizeCompacted" : 0,
19 "totalUpdatedRecordsCompacted" : 0,
20 "totalLogBlocks" : 0,
21 "totalCorruptLogBlock" : 0,
22 "totalRollbackBlocks" : 0,
23 "fileSizeInBytes" : 435208,
24 "minEventTime" : null,
25 "maxEventTime" : null
26 } ]
27 },
28 "compacted" : false,
29 "extraMetadata" : {
30 "schema" : "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"price\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"dt\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null}]}",
31 "deltastreamer.checkpoint.reset_key" : "earliest",
32 "deltastreamer.checkpoint.key" : null
33 },
34 "operationType" : "BULK_INSERT",
35 "fileIdAndRelativePaths" : {
36 "487e265e-21f2-4830-9c54-e91bdae6e496-0" : "dt=2021-05-05/487e265e-21f2-4830-9c54-e91bdae6e496-0_0-5-5_20220514205049.parquet"
37 },
38 "totalRecordsDeleted" : 0,
39 "totalLogRecordsCompacted" : 0,
40 "totalLogFilesCompacted" : 0,
41 "totalCompactedRecordsUpdated" : 0,
42 "totalLogFilesSize" : 0,
43 "totalScanTime" : 0,
44 "totalCreateTime" : 0,
45 "totalUpsertTime" : 0,
46 "minAndMaxEventTime" : {
47 "Optional.empty" : {
48 "val" : null,
49 "present" : false
50 }
51 },
52 "writePartitionPaths" : [ "dt=2021-05-05" ]
53}
54 at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:347)
55 at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:280)
56 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:182)
57 at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
58 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:180)
59 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:509)
60 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
61 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
62 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
63 at java.lang.reflect.Method.invoke(Method.java:498)
64 at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
65 at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
66 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
67 at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
68 at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
69 at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
70 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
71 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
这是因为虽然我们加了参数--checkpoint earliest
,但是代码里将checkpoint的值写死为null,在异常信息里可以看到从commit获取到的commit元数据信息:"deltastreamer.checkpoint.reset_key" : "earliest"
, "deltastreamer.checkpoint.key" : null
(最新版本如果为null,则不保存,即没有这个key),代码对应为类:org.apache.hudi.utilities.sources.SqlSource
1return Pair.of(Option.of(source), null);
checkpoint为null就不能再次使用HoodieDeltaStreamer
增量写这个表了,要解决这个问题,只需要将代码改为:
1return Pair.of(Option.of(source), "0");
代码我已经提交到https://gitee.com/dongkelun/hudi/commits/0.9.0,该分支也包含对0.9.0版本的其他修改,除了这个异常还有可能因Hive版本不一致抛出没有方法setQueryTimeout的异常,解决方法我也提交到该分支了,可以自己查看。
最新版本(0.11)已经尝试修复这个问题,PR:https://github.com/apache/hudi/pull/3648,可以参考这个PR解决这个问题,基于这个PR,我们使用该PR新增的参数--allow-commit-on-no-checkpoint-change
,就会跳过No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Optional.empty). New Checkpoint=(null)
,它是这样解释的:
1allow commits even if checkpoint has not changed before and after fetch data from source. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode
当从源获取数据时,即使checkpoint没有变化也允许commit,这对于像SqlSource这样没有checkpoint的很有用,但是不建议在continuous模式中使用,但是SqlSource不能使用--checkpoint
,否则依旧会报上面的异常,所以我提了一个PR:https://github.com/apache/hudi/pull/5633 尝试解决这个问题,不知道社区会不会接受
DFSSource
Distributed File System (DFS)
历史数据DFS JSON转化,支持多种数据格式
创建hive历史表,存储格式JSON
1create table test.test_source_json(
2 id int,
3 name string,
4 price double,
5 ts bigint,
6 dt string
7)
8row format serde 'org.apache.hive.hcatalog.data.JsonSerDe'
9STORED AS TEXTFILE;
插入数据
1insert into test.test_source_json values (1,'hudi', 10.0,100,'2021-05-05');
配置文件
dfs_source.properties
这里演示非分区表
1include=common.properties
2
3hoodie.datasource.write.recordkey.field=id
4hoodie.datasource.write.partitionpath.field=
5hoodie.deltastreamer.source.dfs.root=/test/test_source_json
6hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url=jdbc:hive2://10.110.105.163:10000/hudi;principal=hive/indata-10-110-105-163.indata.com@INDATA.COM
7
8hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type=org.apache.hive.jdbc.HiveDriver
9hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username=user
10hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password=password
11hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable=test.test_source_json
12hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout=100
13
14hoodie.datasource.hive_sync.table=test_hudi_target_json
15hoodie.datasource.hive_sync.database=hudi
命令
1spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \
2--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.9.0.jar \
3--props file:///opt/dkl/dfs_source.properties \
4--target-base-path /hudi/test_hudi_target_json \
5--target-table test_hudi_target_json \
6--op BULK_INSERT \
7--table-type COPY_ON_WRITE \
8--schemaprovider-class org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider \
9--enable-sync
这里没有指定
--source-class
的原因是,它的默认值就是JsonDFSSource
用JdbcbasedSchemaProvider
获取Schema的原因是因为我对于生成avsc文件没有经验,两者选其一,所以选择了通过配置jdbc的形式获取Schema
当然这里在0.9.0版本如果需要内部表的话也需要和上面讲的一样事先用SparkSQL建表,0.11.0版本直接配置参数即可
默认的代码读取Hive表Schema是有异常的,异常如下
1Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed to get Schema through jdbc.
2 at org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider.getSourceSchema(JdbcbasedSchemaProvider.java:81)
3 at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42)
4 at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:860)
5 at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:235)
6 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:654)
7 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:143)
8 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:116)
9 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
10 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
11 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
12 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
13 at java.lang.reflect.Method.invoke(Method.java:498)
14 at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
15 at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
16 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
17 at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
18 at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
19 at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
20 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
21 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22Caused by: org.apache.hudi.exception.HoodieException: test.test_source_json table does not exists!
23 at org.apache.hudi.utilities.UtilHelpers.getJDBCSchema(UtilHelpers.java:439)
24 at org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider.getSourceSchema(JdbcbasedSchemaProvider.java:79)
25 ... 19 more
第一个异常是:table does not exists!,当然根本原因并不是不存,需要修改源码,代码已经提交到代码我已经提交到https://gitee.com/dongkelun/hudi/commits/0.9.0,关于这个异常主要修改了两个地方,一个是因为Hive版本不一致抛出没有方法setQueryTimeout的异常,这里直接把调用setQueryTimeout方法的两个地方都删掉了,还有一个地方,是原来的tableExists
如果遇到异常,直接返回false,后面的逻辑如果返回false,直接抛出异常table does not exists!
,这样不能分析根本原因,因为还有其他原因造成的异常,比如kerberos权限问题,这里改成直接打印异常信息,方便分析原因,关于这一点我已经提交了PR:https://github.com/apache/hudi/pull/5827
原代码
1 private static Boolean tableExists(Connection conn, Map<String, String> options) {
2 JdbcDialect dialect = JdbcDialects.get(options.get(JDBCOptions.JDBC_URL()));
3 try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())))) {
4 statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
5 statement.executeQuery();
6 } catch (SQLException e) {
7 return false;
8 }
9 return true;
10 }
修改后:
1 private static Boolean tableExists(Connection conn, Map<String, String> options) {
2 JdbcDialect dialect = JdbcDialects.get(options.get(JDBCOptions.JDBC_URL()));
3 try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())))) {
4 statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
5 statement.executeQuery();
6 } catch (SQLException e) {
7 e.printStackTrace();
8 }
9 return true;
10 }
下面的这个异常,只需要修改Hive配置:hive.resultset.use.unique.column.names=false,关于这一点,我已经在Apache Hudi 入门学习总结提到过了
1Exception in thread "main" org.apache.hudi.exception.HoodieException: Failed to get Schema through jdbc.
2 at org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider.getSourceSchema(JdbcbasedSchemaProvider.java:81)
3 at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42)
4 at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:730)
5 at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:220)
6 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:606)
7 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:143)
8 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:107)
9 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:509)
10 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
11 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
12 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
13 at java.lang.reflect.Method.invoke(Method.java:498)
14 at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
15 at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
16 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
17 at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
18 at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
19 at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
20 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
21 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22Caused by: org.apache.avro.SchemaParseException: Illegal character in: test_source_json.id
23 at org.apache.avro.Schema.validateName(Schema.java:1151)
24 at org.apache.avro.Schema.access$200(Schema.java:81)
25 at org.apache.avro.Schema$Field.<init>(Schema.java:403)
26 at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
27 at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2120)
28 at org.apache.avro.SchemaBuilder$FieldBuilder.access$5200(SchemaBuilder.java:2034)
29 at org.apache.avro.SchemaBuilder$GenericDefault.noDefault(SchemaBuilder.java:2417)
30 at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:177)
31 at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$$anonfun$5.apply(SchemaConverters.scala:174)
32 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
33 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
34 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
35 at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
36 at org.apache.hudi.spark.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:174)
37 at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:63)
38 at org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema(AvroConversionUtils.scala)
39 at org.apache.hudi.utilities.UtilHelpers.getJDBCSchema(UtilHelpers.java:388)
40 at org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider.getSourceSchema(JdbcbasedSchemaProvider.java:79)
41 ... 19 more
42
KafkaSource
HoodieDeltaStreamer
支持两种Kafka格式的数据Avro和Json,分别对应AvroKafkaSource和JsonKafkaSource,这里为了方便造数,以JsonKafkaSource为例
Kafka配置文件
kafka_client_jaas.conf
1KafkaClient {
2com.sun.security.auth.module.Krb5LoginModule required
3useTicketCache=false
4useKeyTab=true
5keyTab="./kafka.service.keytab"
6principal="kafka/indata-10-110-105-163.indata.com@INDATA.COM"
7serviceName="kafka"
8storeKey=true
9renewTicket=true;
10};
producer.properties
这个配置是为了往kafka里造数
1security.protocol=SASL_PLAINTEXT
2sasl.kerberos.service.name=kafka
3sasl.mechanism=GSSAPI
造数
先kinit认证kerberos
1kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/indata-10-110-105-163.indata.com@INDATA.COM
1/usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-producer.sh --broker-list indata-10-110-105-163.indata.com:6667 --topic test_hudi_target_topic --producer.config=producer.properties
1{"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2021-05-05"}
2{"id":2,"name":"hudi","price":12.0,"ts":100,"dt":"2021-05-05"}
3{"id":3,"name":"hudi","price":13.0,"ts":100,"dt":"2021-05-06"}
消费
命令行消费topic验证数据是否成功写到对应的topic
1/usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh --bootstrap-server indata-10-110-105-163.indata.com:6667 --from-beginning --topic test_hudi_target_topic --group dkl_hudi --consumer-property security.protocol=SASL_PLAINTEXT
2{"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2021-05-05"}
3{"id":2,"name":"hudi","price":12.0,"ts":100,"dt":"2021-05-05"}
4{"id":3,"name":"hudi","price":13.0,"ts":100,"dt":"2021-05-06"}
Hudi配置文件
kafka_source.properties
1include=common.properties
2
3hoodie.datasource.write.recordkey.field=id
4hoodie.datasource.write.partitionpath.field=dt
5
6hoodie.deltastreamer.source.kafka.topic=test_hudi_target_topic
7bootstrap.servers=indata-10-110-105-162.indata.com:6667,indata-10-110-105-163.indata.com:6667,indata-10-110-105-164.indata.com:6667
8auto.offset.reset=earliest
9group.id=dkl_hudi
10security.protocol=SASL_PLAINTEXT
11
12hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url=jdbc:hive2://10.110.105.163:10000/default;principal=hive/indata-10-110-105-163.indata.com@INDATA.COM
13hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type=org.apache.hive.jdbc.HiveDriver
14hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username=user
15hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password=password
16hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable=test.test_source_json
17hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout=100
18
19hoodie.datasource.hive_sync.table=test_hudi_target_kafka
20hoodie.datasource.hive_sync.database=hudi
21hoodie.datasource.hive_sync.partition_fields=dt
kafkaSource和dfsSource一样也需要提供表Schema,由于这里读取kafka,而没有源表,这里从上面dfsSource建的表test.test_source_json
读取schema,从哪个表读取Schema都行,只要表结构一致即可
命令
1spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \
2--files ./kafka_client_jaas.conf,./kafka.service.keytab \
3--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
4--driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
5--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.9.0.jar \
6--props file:///opt/dkl/kafka_source.properties \
7--target-base-path /hudi/test_hudi_target_kafka \
8--target-table test_hudi_target_kafka \
9--op UPSERT \
10--table-type COPY_ON_WRITE \
11--schemaprovider-class org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider \
12--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
13--enable-sync
上面的都是一次性读取转化,kafka也可以连续模式读取增量数据,通过参数--continuous
,即:
1spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \
2--files ./kafka_client_jaas.conf,./kafka.service.keytab \
3--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
4--driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
5--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.9.0.jar \
6--props file:///opt/dkl/kafka_source.properties \
7--target-base-path /hudi/test_hudi_target_kafka \
8--target-table test_hudi_target_kafka \
9--op UPSERT \
10--table-type COPY_ON_WRITE \
11--schemaprovider-class org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider \
12--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
13--enable-sync \
14--continuous
连续模式默认间隔0s即没有间隔连续性的读取checkpoint判断kafka(和offset对比)里是否有增量,可以通过参数--min-sync-interval-seconds
来修改间隔,比如 --min-sync-interval-seconds 60,设置60s读取一次
我们可以往kafka topic里再造几条josn数据,进行验证,是否可以正常读取增量数据
多表转化
博客参考:https://hudi.apache.org/blog/2020/08/22/ingest-multiple-tables-using-hudi/
以kafka json示例,首选创建两个用于获取schema的空表,test.test_source_json_1,test.test_source_json_2,然后创建两个kafka topic并往里造数test_hudi_target_topic_1,test_hudi_target_topic_2,最后通过HoodieMultiTableDeltaStreamer
往两个Hudi表test_hudi_target_kafka_1,test_hudi_target_kafka_2写数据
配置文件
kafka_source_multi_table.properties
1hoodie.deltastreamer.ingestion.tablesToBeIngested=hudi.test_hudi_target_kafka_1,hudi.test_hudi_target_kafka_2
2hoodie.deltastreamer.ingestion.hudi.test_hudi_target_kafka_1.configFile=file:///opt/dkl/multi/config_table_1.properties
3hoodie.deltastreamer.ingestion.hudi.test_hudi_target_kafka_2.configFile=file:///opt/dkl/multi/config_table_2.properties
4
5
6#Kafka props
7bootstrap.servers=indata-10-110-105-162.indata.com:6667,indata-10-110-105-163.indata.com:6667,indata-10-110-105-164.indata.com:6667
8auto.offset.reset=earliest
9group.id=dkl_hudi
10security.protocol=SASL_PLAINTEXT
config_table_1.properties
1include=common.properties
2hoodie.datasource.write.recordkey.field=id
3hoodie.datasource.write.partitionpath.field=dt
4
5hoodie.deltastreamer.source.kafka.topic=test_hudi_target_topic_1
6
7hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url=jdbc:hive2://10.110.105.163:10000/default;principal=hive/indata-10-110-105-163.indata.com@INDATA.COM
8
9hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type=org.apache.hive.jdbc.HiveDriver
10hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username=user
11hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password=password
12hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable=test.test_source_json_1
13hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout=100
14
15hoodie.datasource.hive_sync.table=test_hudi_target_kafka_1
16hoodie.datasource.hive_sync.database=hudi
17hoodie.datasource.hive_sync.partition_fields=dt
config_table_2.properties
1include=common.properties
2hoodie.datasource.write.recordkey.field=id
3hoodie.datasource.write.partitionpath.field=dt
4
5hoodie.deltastreamer.source.kafka.topic=test_hudi_target_topic_2
6
7hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url=jdbc:hive2://10.110.105.163:10000/default;principal=hive/indata-10-110-105-163.indata.com@INDATA.COM
8
9hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type=org.apache.hive.jdbc.HiveDriver
10hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username=user
11hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password=password
12hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable=test.test_source_json_2
13hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout=100
14
15hoodie.datasource.hive_sync.table=test_hudi_target_kafka_2
16hoodie.datasource.hive_sync.database=hudi
17hoodie.datasource.hive_sync.partition_fields=dt
1spark-submit --principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \
2--files "./kafka_client_jaas.conf,./kafka.service.keytab" \
3--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
4--driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
5--class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.9.0.jar \
6--props file:///opt/dkl/multi/kafka_source_multi_table.properties \
7--base-path-prefix / \
8--config-folder file:///opt/dkl/multi \
9--target-table test_hudi_target_kafka_1 \
10--op UPSERT \
11--table-type COPY_ON_WRITE \
12--schemaprovider-class org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider \
13--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
14--enable-hive-sync
注意这里--target-table
是必填项,我们填其中一个表名即可,不过我认为不应该设置为必填,因为两个表名已经在配置文件中了还有0.9.0版本同步hive只有--enable-hive-sync
参数没有--enable-sync
,在最新版本里是有这个参数的,但是新版中--enable-hive-sync
并没有弃用,可见HoodieMultiTableDeltaStreamer
和HoodieDeltaStreamer
对于相同的参数并没有保持一致,可能用的人不多,贡献的也就不多。关于是否可以去掉--target-table
参数的问题我已经提交了PR:https://github.com/apache/hudi/pull/5883
这里我们并没有指定target-base-path,那么程序又是怎么知道表路径是什么呢,通过阅读源码发现,表路径为:
1String targetBasePath = basePathPrefix + "/" + database + "/" + tableName;
具体代码在方法resetTarget
,因为我们创建的数据库路径为/hudi
,所以这里--base-path-prefix
的值为/
执行上面的命令检查是否成功从每个topic里读取数据并写到对应的表中
HiveSchemaProvider
对应类:org.apache.hudi.utilities.schema.HiveSchemaProvider
上面介绍到用SqlSource的原因主要是可以不用提供为了获取Schema的jdbc Url等信息,但是SqlSource本身存在这一些问题,而其他的则要提供jdbc Url等信息配置起来麻烦,比如DFSSource KafkaSource等,而读取Kafka中的增量也不能用SqlSource,SqlSource只能用来转换一次增量数据,在0.9.0版本读取增量只能配置jdbc相关的参数来获取Schema,而且默认的代码读取Hive Schema还有bug或者不通用(因Hive版本不一致抛出没有方法setQueryTimeout的异常),只能自己该代码编译后才能使用,而0.11.0版本新增了HiveSchemaProvider,应该可以只指定库名表名就可以获取Schema信息了,具体如何使用等有时间我尝试一下再更新
HiveSchemaProvider
提供了四个参数:
1 private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database";
2 private static final String SOURCE_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.table";
3 private static final String TARGET_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.database";
4 private static final String TARGET_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.table";
分别为sourceSchema数据库名称、sourceSchema表名、targetSchema数据库名称、targetSchema表名,其中targetSchema对应的配置是可选的,当没有配置targetSchema,默认targetSchema等于sourceSchema,这个逻辑也同样适用于其他的SchemaProvider,比如上面示例中的JdbcbasedSchemaProvider
,只不过JdbcbasedSchemaProvider
并没有targetSchema的配置参数,只有sourceSchema的参数
配置参数
我们以上面的JsonDFSSource
为例
json_dfs_source.properties
这里演示非分区表
1include=common.properties
2
3hoodie.datasource.write.recordkey.field=id
4hoodie.datasource.write.partitionpath.field=
5hoodie.deltastreamer.source.dfs.root=/test/test_source_json
6
7# 通过HiveSchemaProvider获取Schema
8hoodie.deltastreamer.schemaprovider.source.schema.hive.database=test
9hoodie.deltastreamer.schemaprovider.source.schema.hive.table=test_source_json
10
11hoodie.datasource.hive_sync.table=test_hudi_target_json_2
12hoodie.datasource.hive_sync.database=hudi
命令
1spark-submit --conf "spark.sql.catalogImplementation=hive" \
2--principal hive/indata-10-110-105-163.indata.com@INDATA.COM --keytab /etc/security/keytabs/hive.service.keytab \
3--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.11.0.jar \
4--props file:///opt/dkl/json_dfs_source.properties \
5--target-base-path /hudi/test_hudi_target_json_2 \
6--target-table test_hudi_target_json_2 \
7--op BULK_INSERT \
8--table-type COPY_ON_WRITE \
9--schemaprovider-class org.apache.hudi.utilities.schema.HiveSchemaProvider \
10--enable-sync
我们先把jar升到0.11.0版本及以上,然后执行上面的命令,可能会报下面的异常,原因是因为HiveSchemaProvider
,在获取json格式的表时需要用到hive-hcatalog-core.jar
,我们去hive lib下面执行ls | grep hcatalog-core
,找到该jar包,然后将jar拷贝至spark jars目录下再执行,就可以成功读取表schema并将json数据转为Hudi目标表。
122/06/15 16:12:09 ERROR log: error in initSerDe: java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found
2java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
3 at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2500)
4 at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:385)
5 at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
6 at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:258)
7 at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605)
8 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$7.apply(HiveClientImpl.scala:373)
9 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$7.apply(HiveClientImpl.scala:370)
10 at scala.Option.map(Option.scala:146)
11 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:370)
12 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:368)
13 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:277)
14 at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:215)
15 at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:214)
16 at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:260)
17 at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:368)
18 at org.apache.spark.sql.hive.client.HiveClient$class.getTable(HiveClient.scala:81)
19 at org.apache.spark.sql.hive.client.HiveClientImpl.getTable(HiveClientImpl.scala:84)
20 at org.apache.spark.sql.hive.HiveExternalCatalog.getRawTable(HiveExternalCatalog.scala:118)
21 at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getTable$1.apply(HiveExternalCatalog.scala:700)
22 at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getTable$1.apply(HiveExternalCatalog.scala:700)
23 at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
24 at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:699)
25 at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138)
26 at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:434)
27 at org.apache.hudi.utilities.schema.HiveSchemaProvider.<init>(HiveSchemaProvider.java:65)
28 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
29 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
30 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
31 at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
32 at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
33 at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:118)
34 at org.apache.hudi.utilities.UtilHelpers.createSchemaProvider(UtilHelpers.java:155)
35 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:656)
36 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:143)
37 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:116)
38 at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
39 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
40 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
41 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
42 at java.lang.reflect.Method.invoke(Method.java:498)
43 at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
44 at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
45 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
46 at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
47 at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
48 at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
49 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
50 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
5122/06/15 16:12:09 ERROR Table: Unable to get field from serde: org.apache.hive.hcatalog.data.JsonSerDe
52java.lang.RuntimeException: MetaException(message:java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found)
53
通过示例中的配置可以看到利用HiveSchemaProvider
获取schema时的配置比较简单,方便使用,如果有targetSchema和sourceShcema不一致的需求,大家可以通过配置targetSchema的库名表名自己尝试。对于其他类型的Source,大家觉得HiveSchemaProvider
比较方便的话,也可以自行修改配置参数等。
总结
本文主要总结了Hudi DeltaStreamer的使用,以及遇到的各种问题,给出了解决方法,主要是使用该工具类读取历史表并转化为Hudi表以及读取增量数据写入Hudi表,当然也支持从关系型数据库读取表数据同步到Hudi表中,本文没有作出示例,由于问题较多,写的稍微乱一点,后面应该还会再写一篇整理一下,并且会从原理、源码层面进行总结,不过示例可能涉及会比较少。




