暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数据湖正当时!华为云MRS重磅集成Apache Hudi

ApacheHudi 2021-08-02
1900


Apache Hudi是数据湖的文件组织层,对Parquet等格式文件进行管理提供数据湖能力,支持多种计算引擎,提供IUD接口,在 HDFS/OBS的数据集上提供了插入更新和增量拉取的流原语。

Hudi特性

ACID事务能力,支持实时入湖和批量入湖。多种视图能力(读优化视图/增量视图/实时视图),支持快速数据分析。MVCC设计,支持数据版本回溯。自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。支持并发读写,基于snapshot的隔离机制实现写入时可读取。支持原地转表,将存量的历史表转换为Hudi数据集。

Hudi关键技术和优势

可插拔索引机制:Hudi提供多种索引机制,可以快速完成对海量数据的更新和删除操作。良好的生态支持:Hudi支持多种数据引擎接入包括Hive、Spark、Flink。

Hudi支持两种表类型

Copy On Write

写时复制表也简称cow表,使用parquet文件存储数据,内部的更新操作需要通过重写原始parquet文件完成。

优点:读取时,只读取对应分区的一个数据文件即可,较为高效

缺点:数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。且由于耗时,读请求读取到的数据相对就会滞后

Merge On Read

读时合并表也简称mor表,使用列格式parquet和行格式Avro两种方式混合存储数据。其中parquet格式文件用于存储基础数据,Avro格式文件(也可叫做log文件)用于存储增量数据。

优点:由于写入数据先写delta log,且delta log较小,所以写入成本较低

缺点:需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log 和 老数据文件合并

Hudi支持三种视图,不同场景提供相应的读能力

Snapshot View

实时视图:该视图提供当前hudi表最新的快照数据,即一旦有最新的数据写入hudi表,通过该视图就可以查出刚写入的新数据。

cow表和mor均支持这种视图能力。

Incremental View

增量视图:该视图提供增量查询的能力,可以查询指定COMMIT之后的增量数据,可用于快速拉取增量数据。

cow表支持该种视图能力, mor表也可以支持该视图,但是一旦mor表完成compact操作其增量视图能力消失。

Read Optimized View

读优化视图:该视图只会提供最新版本的parquet文件中存储的数据。

该视图在cow表和mor表上表现不同:

对于cow表,该视图能力和实时视图能力是一样的(cow表只用parquet文件存数据)。对于mor表,仅访问基本文件,提供给定文件片自上次执行compact操作以来的数据, 可简单理解为该视图只会提供mor表parquet文件存储的数据,log文件里面的数据将被忽略。该视图数据并不一定是最新的,但是mor表一旦完成compact操作,增量log数据被合入到了base数据里面,这个时候该视图和实时视图能力一样。

Hudi数据的实时写入

Hudi支持多种写入方式,实时写入方面MRS 3.1.0中集成了Hudi基于Spark的写入工具DeltaStreamer,不久的将来MRS将在升级Hudi 0.8.0版本,实现了对FlinkSQL写入Hudi的支持。

DeltaStreamer支持单次写入和持续写入两种方式,在持续写入的模式中,支持自动调度compaction和clean任务。同时,DeltaStreamer还支持将同步创建Hive表和分区,便于使用SparkSQL,Hetu等引擎直接通过Hive表查询Hudi数据。

一般场景下,DeltaStreamer通过配置即可拉起任务。以读取Kafka json格式数据为例,首先编写源和目的数据schema,形如:

{
"namespace":"hoodie",
"type": "kafka",
"name": "test",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "id",
"type": "string"
},
{
"name": "score",
"type": "int"
},
{
"name": "grade",
"type": "int"
}
]
}

然后编辑DeltaStreamer运行配置:

#Kafka源数据schema hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://hacluster/tmp/sourceschema.json

#Hudi目的数据schema hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://hacluster/tmp/targetschema.json

#进行precombine的字段,当两条数据有相同主键时,根据该字段选择值更大的数据写入,值大小使用Object.compareTo(..)判断 hoodie.datasource.write.precombine.field=score

#数据主键 hoodie.datasource.write.recordkey.field=id

#分区字段 hoodie.datasource.write.partitionpath.field=grade

#是否启动同步Hive表 hoodie.datasource.hive_sync.enable=true

#是否使用元数据方式同步Hive表 hoodie.datasource.meta.sync.enable=true

#Hive表同步使用的database hoodie.datasource.hive_sync.database=default

#同步到Hive的表名 hoodie.datasource.hive_sync.table=test

#同步Hive表使用的工具类 hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool

#同步Hive表的数据格式 hoodie.datasource.hive_sync.base_file_format=PARQUET

#同步Hive表的分区字段 hoodie.datasource.hive_sync.partition_fields=grade

#Hive的JDBC连接地址 hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://zk_ip_1:2181,zk_ip_2:2181,zk_ip_3:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2

#Kafka Source topic hoodie.deltastreamer.source.kafka.topic=zhongyushuo

#Kafka props #Kafka集群Bootstrap Server地址 bootstrap.servers= 192.168.0.231:9092,192.168.0.83:9092,192.168.0.240:9092

配置文件编辑完成后,使用spark-submit命令提交任务,任务提交命令中可以指定表类型,操作类型(insert/bulk_insert/upsert)。

UPSERT(插入更新)是默认行为,写入是将根据索引,如果当前没有与要写入数据主键相同的数据则直接插入,如果存在与要写入数据主键相同的数据,则进行更新操作。INSERT(插入),与upsert类似,但是不会进行主键的索引查找,允许数据重复。BULK_INSERT(批插入)语义与INSERT相同,但是对于批量写入场景效率更高。DeltaStreamer任务提交命令如下:

source /opt/Bigdata/client/bigdata_env
source /opt/Bigdata/client/Hudi/component_env
spark-submit\
--name kafka2hudi\
--master yarn\
--deploy-mode cluster\
--driver-memory 4g\
--executor-memory 6g\
--executor-cores 4\
--num-executors 4\
--conf spark.kryoserializer.buffer.max=128m\
--conf spark.yarn.executor.memoryOverhead=4g\
--driver-class-path /opt/Bigdata/client/Hudi/hudi/conf:/opt/Bigdata/client/Hudi/hudi/lib/*:/opt/Bigdata/client/Spark2x/spark/jars/*:\
--jars /opt/Bigdata/client/Spark2x/spark/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.4.5-hw-ei-310012.jar,/opt/Bigdata/client/Spark2x/spark/jars/streamingClient010/kafka-clients-2.4.0-hw-ei-310012.jar\
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls opt/Bigdata/client/Hudi/hudi/lib/hudi-utilities_2.11-0.7.0-hw-ei-310012.jar`\
--props hdfs://hacluster/tmp/config.properties\
--target-base-path hdfs://haclustet/user/hudi/test/\
--source-ordering-field score\
--table-type MERGE_ON_READ\
--target-table test\
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource\
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider\
--enable-sync\
--op UPSERT\
--continuous

Hudi的使用场景举例

传统数仓如Hive等,没有更新和删除语义,和传统数据库等有较大差别。当需要将数据库更新数据写入Hive时,往往需要将整个分区的数据读取出来,使用Spark等离线引擎进行数据合并后,再将数据进行覆写,数据延迟达到小时级甚至天级。

而在使用Hudi的数据湖中,只需要利用Hudi的upsert语义即可将数据库中的更新实时同步到数据湖数据当中,实现数据延迟T+1到T+0的优化。

在该场景中,首先使用CDC工具将数据库的binlog解析出来,写入到消息队列中,再由DeltaStreamer拉取消息队列中的数据写入Hudi,如下图

数据实时写入Hudi时,利用DeltaStreamer同步Hive表的功能,新版本的SparkSQL、Hive、Hute等查询引擎即可保持SQL不变,进行实时视图的查询。

总结

华为云提供了大数据MapReduce服务(MRS),MRS是一个在华为云上部署和管理Hadoop系统的服务,一键即可部署Hadoop集群。MRS提供租户完全可控的一站式企业级大数据集群云服务,完全兼容开源接口,结合华为云计算、存储优势及大数据行业经验,为客户提供高性能、低成本、灵活易用的全栈大数据平台,轻松运行Hadoop、Spark、Flink、Hudi等大数据组件,并具备在后续根据业务需要进行定制开发的能力,帮助企业快速构建海量数据信息处理系统,并通过对海量信息数据实时与非实时的分析挖掘,发现全新价值点和企业商机。

MRS最新版本中已集成Hudi,客户可以在一键部署集群基于Hudi构建数据湖,更好地发掘数据价值。

推荐阅读

重磅!AWS升级对Apache Hudi的集成

恭喜!Apache Hudi社区新晋多名顶级互联网公司Committer

快手基于Apache Hudi的实践

Apache Hudi测试、运维操作万字总结

Streaming与Hudi、Hive湖仓一体!

文章转载自ApacheHudi,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论