原文链接:https://databricks.com/blog/2020/01/29/query-delta-lake-tables-presto-athena-improved-operations-concurrency-merge-performance.html
文章转自:https://yq.aliyun.com/articles/748544?spm=a2c4e.11163080.searchblog.95.4b412ec1W8FiXs
最近,Delta Lake发布了0.5.0版本,该版本加入了对Presto和Athena的支持,以及提升了操作的并发性,本文将对Delta Lake 0.5.0版本的变化进行一个简单的介绍。
Delta Lake 0.5.0发布的几个最重要的特性如下:
通过使用Manifest文件能够,支持其他数据处理引擎,现在能够使用Scala、Java、Python和SQL的API生成Manifest文件,并使用该文件通过Presto和Amazon Athena访问Delta Lake中的表数据,详细的使用方式见 https://docs.delta.io/0.5.0/presto-integration.html 提升了Delta Lake所有操作的并发性,现在可以并发执行更多的Delta Lake操作。通过使用更加精细的冲突检测策略,Delta Lake的乐观并发控制得到了有效的改善,这使得我们能够更加容易地在Delta表上执行复杂的工作流。举例来说: 我们可以在给新分区添加内容的同时,在旧分区上执行delete操作; 在不相交的一组分区上同时执行update和merge操作; 让文件合并和增加文件内容同时执行
想要获取更多的信息,可以参考开源Delta Lake 0.5.0的release notes。在此博客文章中,我们将详细介绍如何使用Presto读取Delta Lake表、操作并发性的提升以及使用insert-only merge操作来更方便快速地去除重复数据。
使用Presto读取Delta Lake表
正如这篇文章《Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs》 所说,Delta Lake 的一些修改数据的操作,如delete操作,是通过给包含删除数据的文件写一个新版本,并只是将旧版本文件标记为已删除来实现的。Delta Lake采用这种方法的优势在于能够让我们查询旧版本的数据。如果我们想要了解哪些数据(或行)包含最新的数据,默认情况下我们可以去查询事务日志。其他数据处理系统,如Presto和Athena想要获取这些信息,可以通过读取Delta Lake生成的一种清单文本文件——Manifest,该文件中包含查询Delta Lake表需要读取的数据文件列表。为了实现Presto和Athena读取Delta Lake表,我们可以通过执行一些Python命令来实现,详细的内容可以参考《 Set up the Presto or Athena to Delta Lake integration and query Delta tables 》。
生成Delta Lake的Manifest文件
首先,使用以下代码片段创建Delta Lake的Manifest文件:
deltaTable = DeltaTable.forPath(pathToDeltaTable)deltaTable.generate("symlink_format_manifest")
正如代码字面意思所示,以上操作将在表根目录中生成Manifest文件。如果你根据 《Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs 》这篇文章介绍的内容创建了departureDelays
表,将会在表根目录中产生一个新的文件夹:
$/departureDelays.delta/_symlink_format_manifest
该文件夹中会有一个名为manifest的文本文件。如果你查看manifest文件的内容(例如使用cat命令),你将能看到类似以下的文本内容,它们指示了包含最新快照的文件。
file:$/departureDelays.delta/part-00003-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00006-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00001-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00000-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00000-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00001-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00002-...-c000.snappy.parquetfile:$/departureDelays.delta/part-00007-...-c000.snappy.parquet
创建Presto表以读取生成的Manifest文件
接下来的步骤是在Hive Metastore中创建一个外部表,以便Presto(或Athena)可以读取上一步生成的Manifest文件,来获得需要读取的Parquet文件,以读取Delta Lake表的最新快照。需要说明的是,对于Presto,你可以使用Apache Spark或Hive CLI来运行以下命令:
1. CREATE EXTERNAL TABLE departureDelaysExternal ( ... )2. ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'3. STORED AS INPUTFORMAT4. OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'5. LOCATION '$/departureDelays.delta/_symlink_format_manifest'
一些重要的说明:
第一行所定义的schema必须和Delta Lake中的schema相同 第五行需要指向Manifest文件的位置—— _symlink_format_manifest
Presto(或Athena)需要配置SymlinkTextInputFormat
才能从Manifest文件中获取Parquet数据文件的列表,而不是使用目录列表中的文件。需要说明的是,如果想要使用分区表,需要按照《Configure Presto to read the generated manifests》这篇文章进行一些额外的步骤。
更新Manifest文件
需要注意的是,如果Delata Lake的数据有更新,都需要重新生成Manifest文件,以便Presto能够获取到最新的数据。
操作并发性的提升
在Delta Lake 0.5.0版本,我们能够同时执行更多的操作。通过更细粒度的冲突检测,这些最新的更新让Delta Lake能够更容易地在Delta Lake表上执行复杂的工作流,例如:
可以在给新分区添加内容的同时,在旧分区上执行delete操作; 追加文件内容的同时执行文件合并操作; 在不相交的一组分区上同时执行update和merge操作。
并发追加文件内容的用例
举个例子,当我们在执行merge操作的同时,如果有并发的事务向同一个分区写入记录,Delta Lake往往会抛出ConcurrentAppendException
异常。
// Target 'deltaTable' is partitioned by date and countrydeltaTable.as("t").merge(source.as("s"),"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country").whenMatched().updateAll().whenNotMatched().insertAll().execute()
上面的代码段就有可能会引发冲突,因为即使表已经按照date和country进行了分区,条件仍然不够明确。问题在于,这个查询将扫描整个表,从而可能与update任何其他分区的并发操作发生冲突。通过指定specificDate
和specificCountry
,以便可以在特定的date和country进行merge操作,现在我们就可以安全地在不同的date和contry同时执行此操作。
// Target 'deltaTable' is partitioned by date and countrydeltaTable.as("t").merge(source.as("s"),"s.user_id = t.user_id AND d.date = '" + specificDate + "' AND d.country = '" + specificCountry + "'").whenMatched().updateAll().whenNotMatched().insertAll().execute()
以上方法适用于其他所有的Delta Lake操作(如delete、更改元数据等)。
并发文件合并
如果你连续不断地将数据写入Delta表,随着时间的流逝,将会累积出大量的文件。这在流式数据场景中尤为重要,因为此时是以比较小的batch写入数据的,这将会导致文件系统不断地累积小文件,随着时间的推移,小文件的数量会不断增加,会降低查询的性能。优化这种场景的一个比较重要的方式就是定期获取大批量的小文件,并将其重写为数量比较小的大文件(文件合并)。过去,在同时进行数据查询和执行文件合并时,出现异常的可能性会非常高。但是现在,由于Delta Lake 0.5.0版本的优化改进,我们可以同时执行查询操作(包括流式查询)和文件的合并,并且不会有任何异常产生。举个例子来说,如果你的表已经进行了分区,并且你只想基于谓词对一个分区进行重新分区,则可以使用where来仅读取该分区,并使用replaceWhere回写该分区:
path = "..."partition = "year = '2019'"numFilesPerPartition = 16 # Compact partition of a table to no. of files(spark.read.format("delta").load(path).where(partition).repartition(numFilesPerPartition).write.option("dataChange", "false").format("delta").mode("overwrite").option("replaceWhere", partition).save(path))
以上代码中需要注意的是,仅在没有数据更改时,才使用dataChange == false选项,否则可能会破坏底层数据。
使用Insert-only Merge操作方便快速地去除重复数据
一个场景的ETL用例是搜集日志,并将其附加到Delta Lake表当中,一个比较常见的问题是数据源会产生重复的日志记录。通过使用Delta Lake的merge,你可以避免插入这些重复的记录,例如以下涉及merge以及update航班数据的代码:
# Merge merge_table with flightsdeltaTable.alias("flights") \.merge(merge_table.alias("updates"),"flights.date = updates.date") \.whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \.whenNotMatchedInsertAll() \.execute()
在Delta Lake 0.5.0版本之前,不可能从Delta Lake表中将重复数据作为流进行读取,因为insert-only merge并不是纯粹地将数据追加到表中。例如,在流查询中,你可以在foreachBatch
中执行merge操作来连续不断地将流数据写入Delta Lake表当中,并将需要删除的重复数据打上标记。以下PySpark的代码展示了这个场景:
from delta.tables import *deltaTable = DeltaTable.forPath(spark, "/data/aggregates")# Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF, batchId):deltaTable.alias("t").merge(microBatchOutputDF.alias("s"),"s.key = t.key") \.whenMatchedUpdateAll() \.whenNotMatchedInsertAll() \.execute()}# Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream \.format("delta") \.foreachBatch(upsertToDelta) \.outputMode("update") \.start()
在另一个流式查询中,你可以从该Delta Lake表中连续读取需要删除的重复数据。这是可能的,因为insert-only merge操作(在Delta Lake 0.5.0版本引入)只会将新数据追加到Delta Lake表中。
欢迎关注数据湖技术社区
数据湖开发者社区由 阿里云开发者社区 与 阿里云Data Lake Analytics团队 共同发起,致力于推广数据湖相关技术,包括hudi、delta、spark、presto、oss、元数据、存储加速、格式发现等,学习如何构建数据湖分析系统,打造适合业务的数据架构。扫描下方钉钉群二维码,加入社区一起学习讨论。

阿里云Data Lake Analytics是Serverless化的交互式联邦查询服务。使用标准SQL即可轻松分析与集成对象存储(OSS)、数据库(PostgreSQL/MySQL等)、NoSQL(TableStore等)数据源的数据。
Data Lake Analytics产品详情页:https://www.aliyun.com/product/datalakeanalytics
Data Lake Analytics 1元购入口:https://common-buy.aliyun.com/?commodityCode=openanalytics_post#/buy




