译者
张鹏(卓昇),阿里云计算平台事业部技术专家。
前言
本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 系列技术文章。众所周知,Databricks 主导着开源大数据社区 Apache Spark、Delta Lake 以及 ML Flow 等众多热门技术,而 Delta Lake 作为数据湖核心存储引擎方案给企业带来诸多的优势。
此外,阿里云和 Apache Spark 及 Delta Lake 的原厂 Databricks 引擎团队合作,推出了基于阿里云的企业版全托管 Spark 产品 —— Databricks 数据洞察,该产品原生集成企业版 Delta Engine 引擎,无需额外配置,提供高性能计算能力。有兴趣的同学可以搜索` Databricks 数据洞察`或`阿里云 Databricks`进入官网,或者直接访问以下链接进行了解:
https://www.aliyun.com/product/bigdata/spark
本系列还包括其他内容,欢迎持续关注:
第一章:基础和性能
第二章:特性
01、【详谈 Delta Lake】系列专题 之 特性 - 01 为什么使用 Delta Lake 的 MERGE 功能?>>
02、(本文)使用 Python API 在 Delta Lake 数据表上操作/大型数据湖的 Time Travel 功能
03、 轻松克隆 Delta Lake / 在 Apache Spark 上的 Delta Lake 中启用 Spark SQL 语句
第三章:Lakehouse
第四章:Streaming
第五章:客户用例

Delta Lake技术系列 - 特性(Features)
——使用 Delta Lake 稳定的特性来可靠的管理您的数据

Chapter-02 使用Python API在Delta Lake数据表上进行简单,可靠的更新和删除操作

在本章中,我们将演示在飞机时刻表的场景中,如何在 Delta Lake 中使用 Python 和新的 Python API。我们将展示如何新增,更新和删除数据,如何使用 time travle 功能来查询旧版本数据,以及如何清理较旧的版本。
Delta Lake使用入门
Delta Lake 软件包可以通过 PySpark 的-- packages 选项来进行安装。在我们的示例中,我们还将演示在 VACUUM 文件和 Apache Spark 中执行 Delta Lake SQL 命令的功能。由于这是一个简短的演示,因此我们还将启用以下配置:
spark.databricks.delta.retentionDurationCheck.enabled=false
允许我们清理文件的时间短于默认的保留时间7天。注意,这仅是对于 SQL 命令 VACUUM 是必需的。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
在 Apache Spark 中启用 Delta Lake SQL 命令;这对于 Python 或 Scala API 调用不是必需的。
# Using Spark Packages./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf “spark. databricks.delta.retentionDurationCheck.enabled=false” --conf “spark. sql.extensions=io.delta.sql.DeltaSparkSessionExtension”
Delta Lake数据的加载和保存
这次将使用准时飞行数据或离港延误数据,这些数据是从RITA BTS航班离岗统计中心生成的;这些数据的一些示例包括 2014 Flight Departure Performance via d3.js Crossfilter 和 针对Apache Spark的具有图形化结构的准时飞行数据。在 PySpark 中,首先读取数据集。
# Location variablestripdelaysFilePath = “/root/data/departuredelays.csv”pathToEventsTable = “/root/deltalake/departureDelays.delta”# Read flight delay datadepartureDelays = spark.read \.option(“header”, “true”) \.option(“inferSchema”, “true”) \.csv(tripdelaysFilePath)
接下来,我们将离港延迟数据保存到 Delta Lake 表中。在保存的过程中,我们能够利用它的优势功能,包括 ACID 事务,统一批处理,streaming 和 time travel 。
# Save flight delay data into Delta Lake formatdepartureDelays \.write \.format(“delta”) \.mode(“overwrite”) \.save(“departureDelays.delta”)
注意,这种方法类似于保存 Parquet 数据的常用方式。现在您将指定格式(“ delta ”)而不是指定格式(“ parquet ”)。如果要查看基础文件系统,您会注意到为 Delta Lake 的离港延迟表创建了四个文件。
/departureDelays.delta$ ls -l..._delta_logpart-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquetpart-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquetpart-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquetPart-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
现在,让我们重新加载数据,但是这次我们的数据格式将由 Delta Lake 支持。
# Load flight delay data in Delta Lake formatdelays_delta = spark \.read \.format(“delta”) \.load(“departureDelays.delta”)# Create temporary viewdelays_delta.createOrReplaceTempView(“delays_delta”)# How many flights are between Seattle and San Franciscospark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
运行结果:

最后,我们确定了从西雅图飞往旧金山的航班数量;在此数据集中,有1698个航班。
立马转换到Delta Lake
如果您有现成的 Parquet 表,则可以将它们转换为 Delta Lake 格式,从而无需重写表。如果要转换表,可以运行以下命令。
from delta.tables import *# Convert non partitioned parquet table at path ‘/path/to/table’deltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/ table`”)# Convert partitioned parquet table at path ‘/path/to/table’ andpartitioned by integer column named ‘part’partitionedDeltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/table`”, “part int”)
删除我们的航班数据
要从传统的数据湖表中删除数据,您将需要:
从表中选择所有数据,排除要删除的行
根据上面的查询创建一个新表
删除原始表
将新表重命名为原始表名,以获取下游依赖关系来代替执行所有这些步骤。使用 Delta Lake ,我们可以通过运行 DELETE 语句来简化此过程。 为了展示这一点,让我们删除所有早点或准点抵达的航班(即,延误<0)。
from delta.tables import *from pyspark.sql.functions import *# Access the Delta Lake tabledeltaTable = DeltaTable.forPath(spark, pathToEventsTable )# Delete all on-time and early flightsdeltaTable.delete(“delay < 0”)# How many flights are between Seattle and San Franciscospark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
运行结果:

从上面的查询中可以看到,我们删除了所有准时航班和早班航班(更多信息,请参见下文),从西雅图到旧金山的航班有837班延误。如果您查看文件系统,会注意到即使删除了一些数据,还是有更多文件。
/departureDelays.delta$ ls -l_delta_logpart-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquetpart-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquetpart-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquetpart-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquetpart-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquetpart-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquetpart-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquetpart-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
在传统的数据湖中,删除是通过重写整个表(不包括要删除的值)来执行的。使用 Delta Lake 可以通过有选择地写入包含要删除数据的文件的新版本来执行删除操作,同时仅将以前的文件标记为已删除。这是因为 Delta Lake 使用多版本并发控制( MVCC )对表执行原子操作:例如,当一个用户正在删除数据时,另一用户可能正在查询之前的版本。这种多版本模型还使我们能够回溯时间(即 time travel )并查询以前的版本,这个功能稍后我们将看到。
更新我们的航班数据
要更新传统数据湖表中的数据,您需要:
从表中选择所有数据,不包括想要修改的行。
修改需要更新/更改的行
合并这两个表以创建一个新表
删除原始表
将新表重命名为原始表名,以实现下游依赖
代替上面的步骤,使用 Delta Lake 我们可以通过运行 UPDATE 语句来简化此过程。为了显示这一点,让我们更新所有从底特律到西雅图的航班。
# Update all flights originating from Detroit to now beoriginating from SeattledeltaTable.update(“origin = ‘DTW’”, { “origin”: “’SEA’” } )# How many flights are between Seattle and San Franciscospark.sql(“select count(1) from delays_delta where origin = ‘SEA’and destination = ‘SFO’”).show()
运行结果:

如今底特律航班已被标记为西雅图航班,现在我们有986航班从西雅图飞往旧金山。如果您要列出您的离岗延迟文件系统(即 $ ../departureDelays/ls -l ),您会注意到现在有11个文件(而不是删除文件后的8个文件和表创建后的4个文件)。
合并我们的航班数据
使用数据湖时,常见的情况是将数据连续追加到表中。这通常会导致数据重复(您不想再次将其插入表中),需要插入的新行以及一些需要更新的行。使用 Delta Lake ,所有这些都可以通过使用合并操作(类似于 SQL MERGE 语句)来实现。
让我们从一个样本数据集开始,您将通过以下查询对其进行更新,插入或删除重复数据。
# What flights between SEA and SFO for these date periodsspark.sql(“select * from delays_delta where origin = ‘SEA’ anddestination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
该查询的输出如下表所示。请注意,已添加颜色编码以清楚地标识哪些行是已删除的重复数据(蓝色),已更新的数据(黄色)和已插入的数据(绿色)。

接下来,让我们生成自己的 merge_table ,其中包含将插入,更新或删除重复的数据。具体看以下代码段:
items = [(1010710, 31, 590, ‘SEA’, ‘SFO’), (1010521, 10, 590, ‘SEA’, ‘SFO’),(1010822, 31, 590, ‘SEA’, ‘SFO’)]cols = [‘date’, ‘delay’, ‘distance’, ‘origin’, ‘destination’]merge_table = spark.createDataFrame(items, cols)merge_table.toPandas()

在上表( merge_table )中,有三行不同的日期值:
1010521:此行需要使用新的延迟值(黄色)更新排期表。
1010710:此行是重复的(蓝色)
1010832:这是要插入的新行(绿色)
使用 Delta Lake ,可以通过合并语句轻松实现,具体看下面代码片段:
# Merge merge_table with flightsdeltaTable.alias(“flights”) \.merge(merge_table.alias(“updates”),”flights.date = updates.date”) \.whenMatchedUpdate(set = { “delay” : “updates.delay” } ) \ .whenNotMatchedInsertAll() \.execute()# What flights between SEA and SFO for these date periodsspark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
一条语句即可有效完成删除重复数据,更新和插入这三个操作。

查看数据表历史记录
如前所述,在我们进行每个事务(删除,更新)之后,在文件系统中创建了更多文件。这是因为对于每个事务,都有不同版本的 Delta Lake 表。
这可以通过使用 DeltaTable.history () 方法看到,如下所示。

注意,您还可以使用 SQL 执行相同的任务:
spark.sql(“DESCRIBE HISTORY ‘” + pathToEventsTable + “’”).show()
如您所见,对于每个操作(创建表,删除和更新),都有三行代表表的不同版本(以下为简化版本,以帮助简化阅读):

回溯数据表的历史
借助 Time Travel ,您可以查看带有版本或时间戳的 Delta Lake 表。要查看历史数据,请指定版本或时间戳选项。在以下代码段中,我们将指定版本选项。
# Load DataFrames for each versiondfv0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“departureDelays.delta”)dfv1 = spark.read.format(“delta”).option(“versionAsOf”, 1).load(“departureDelays.delta”)dfv2 = spark.read.format(“delta”).option(“versionAsOf”, 2).load(“departureDelays.delta”)# Calculate the SEA to SFO flight counts for each version of historycnt0 = dfv0.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()cnt1 = dfv1.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()cnt2 = dfv2.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()# Print out the valueprint(“SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s” % (cnt0, cnt1, cnt2))## OutputSEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
无论是用于治理,风险管理,合规( GRC )还是错误时进行回滚, Delta Lake 表都包含元数据(例如,记录操作员删除的事实)和数据(例如,实际删除的行)。但是出于合规性或大小原因,我们如何删除数据文件?
使用 vacuum 清理旧版本的数据表
默认情况下,Delta Lake vacuum 方法将删除所有超过7天参考时间的行(和文件)。如果要查看文件系统,您会注意到表的11个文件。
/departureDelays.delta$ ls -l _delta_logpart-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquetpart-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquetpart-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquetpart-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquetpart-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquetpart-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquetpart-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquetpart-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquetpart-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquetpart-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquetPart-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
要删除所有文件,以便仅保留当前数据快照,您可以 vacuum 方法指定一个较小的值(而不是默认保留7天)。
# Remove all files older than 0 hours old.deltaTable.vacuum(0)Note, you perform the same task via SQL syntax: ̧# Remove all files older than 0 hours oldspark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
清理完成后,当您查看文件系统时,由于历史数据已被删除,您会看到更少的文件。
/departureDelays.delta$ ls -l_delta_logpart-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquetpart-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquetpart-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquetpart-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
请注意,运行 vacuum 之后,回溯到比保留期更早的版本的功能将会失效。

Chapter-03 大型数据湖的 Time Travel 功能

Delta Lake 提供 Time Travel 功能。Delta Lake 是一个开源存储层,可为数据湖带来可靠性。Delta Lake 提供 ACID 事务,可伸缩的元数据处理,以及批流一体数据处理。Delta Lake 在您现有的数据湖之上运行,并且与 Apache Spark API 完全兼容。
使用此功能,Delta Lake 会自动对您存储在数据湖中的大数据进行版本控制,同时您可以访问该数据的任何历史版本。这种临时数据管理可以简化您的数据管道,包括简化审核,在误写入或删除的情况下回滚数据以及重现实验和报告。
您的组织最终可以在一个干净,集中化,版本化的云上大数据存储库上实现标准化,以此进行分析。
更改数据的常见挑战
审核数据更改:审核数据更改对于数据合规性以及简单的调试(以了解数据如何随时间变化)都至关重要。在这种情况下,传统数据系统都转向大数据技术和云服务。
重现实验和报告:在模型训练期间,数据科学家对给定的数据集执行不同参数的各种实验。当科学家在一段时间后重新访问实验以重现模型时,通常源数据已被上游管道修改。很多时候他们不知道这些上游数据发生了更改,因此很难重现他们的实验。一些科学家和最好的工程师通过创建数据的多个副本来进行实践,从而增加了存储量的费用。对于生成报告的分析师而言,情况也是如此。
回滚:数据管道有时会向下游消费者写入脏数据。发生这种情况的原因可能是基础架构不稳定或者混乱的数据或者管道中的 Bug 等问题。对目录或表进行简单追加的管道,可以通过基于日期的分区轻松完成回滚。随着更新和删除,这可能变得非常复杂,数据工程师通常必须设计复杂的管道来应对这种情况。

使用 Time Travel 功能
Delta Lake 的 time travel 功能简化了上述用例的数据管道构建。Delta Lake 中的 Time Travel极大地提高了开发人员的生产力。它有助于:
数据科学家可以更好地管理实验
数据工程师简化了管道同时可以回滚脏数据
数据分析师可以轻松地分析报告
企业最终可以在干净,集中化,版本化的云存储中的大数据存储库上建立标准化,在此基础上进行数据分析。我们很高兴看到您将能够使用此功能完成工作。
当您写入 Delta Lake 表或目录时,每个操作都会自动进行版本控制。您可以通过两种不同的方式访问数据的不同版本:

使用时间戳
Scala语法
您可以将时间戳或日期字符串作为DataFrame阅读器的选项来提供:
val df = spark.read.format(“delta”) .option(“timestampAsOf”, “2019-01-01”).load(“/path/to/my/table”)df = spark.read \.format(“delta”) \.option(“timestampAsOf”, “2019-01-01”) \.load(“/path/to/my/table”)SQL语法SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01”SELECT count(*) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01 01:30:00.000”
如果您无权访问阅读器的代码库,您可以将输入参数传递给该库以读取数据,通过将 yyyyMMddHHmmssSSS 格式的时间戳传递给表来进行数据回滚:
val inputPath = “/path/to/my/table@20190101000000000”val df = loadData(inputPath)// Function in a library that you don’t have access to def loadData(inputPath : String) : DataFrame = {spark.read.format(“delta”).load(inputPath)}inputPath = “/path/to/my/table@20190101000000000”df = loadData(inputPath)# Function in a library that you don’t have access todef loadData(inputPath):return spark.read \.format(“delta”) \.load(inputPath)}

使用版本号
在 Delta Lake 中,每次写入都有一个版本号,您也可以使用该版本号来进行回溯。
Scala 语法
val df = spark.read.format(“delta”).option(“versionAsOf”, “5238”).load(“/path/to/my/table”)val df = spark.read.format(“delta”).load(“/path/to/my/table@v5238”)
Python 语法
df = spark.read \.format(“delta”) \.option(“versionAsOf”, “5238”) \.load(“/path/to/my/table”)df = spark.read \.format(“delta”) \.load(“/path/to/my/table@v5238”)
SQL 语法
SELECT count(*) FROM my_table VERSION AS OF 5238

审核数据变更
您可以使用 DESCRIBE HISTORY 命令或通过 UI 来查看表更改的历史记录。
重做实验和报告
Time travel 在机器学习和数据科学中也起着重要作用。模型和实验的可重复性是数据科学家的关键考虑因素,因为他们通常在投入生产之前会创建数百个模型,并且在那个耗时的过程中,有可能想回到之前早期的模型。但是由于数据管理通常与数据科学工具是分开的,因此确实很难实现。
Databricks 将 Delta Lake 的 Time Travel 功能与 MLflow (机器学习生命周期的开源平台)相集成来解决可重复实验的问题。为了重新进行机器学习培训,您只需将带有时间戳的 URL 路径作为 MLflow 参数来跟踪每个训练作业的数据版本。
这使您可以返回到较早的设置和数据集以重现较早的模型。您无需与上游团队就数据进行协调,也不必担心为不同的实验克隆数据。这就是统一分析的力量,数据科学与数据工程紧密结合在一起。
回滚
Time travel 可以在产生脏数据的情况下方便回滚。例如,如果您的 GDPR 管道作业有一个意外删除用户信息的 bug ,您可以用下面方法轻松修复管道:
INSERT INTO my_tableSELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)WHERE userId = 111You can also fix incorrect updates as follows:MERGE INTO my_table targetUSING my_table TIMESTAMP AS OF date_sub(current_date(), 1) sourceON source.userId = target.userIdWHEN MATCHED THEN UPDATE SET *
如果您只想回滚到表的之前版本,则可以使用以下任一命令来完成:
RESTORE TABLE my_table VERSION AS OF [version_number]RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]
固定视图的不断更新跨多个下游作业的 Delta Lake 表
通过 AS OF 查询,您现在可以为多个下游作业固定不断更新的 Delta Lake 表的快照。考虑一种情况,其中 Delta Lake 表正在不断更新,例如每15秒更新一次,并且有一个下游作业会定期从此 Delta Lake 表中读取数据并更新不同的目标表。在这种情况下,通常需要一个源 Delta Lake表的一致视图,以便所有目标表都反映相同的状态。
现在,您可以按照下面的方式轻松处理这种情况:
version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY my_table)”).collect()# Will use the latest version of the table for all operations belowdata = spark.table(“my_table@v%s” % version[0][0]data.where(“event_type = e1”).write.jdbc(“table1”)data.where(“event_type = e2”).write.jdbc(“table2”)...data.where(“event_type = e10”).write.jdbc(“table10”)
时间序列分析查询变得简单
Time travel 还简化了时间序列分析。例如,如果您想了解上周添加了多少新客户,则查询可能是一个非常简单的方式,如下所示:
SELECT count(distinct userId) - (SELECT count(distinct userId)FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))FROM my_table
后续
本系列还包括其他内容,欢迎持续关注:
第一章:基础和性能
第二章:特性
01、【详谈 Delta Lake】系列专题 之 特性 - 01 为什么使用 Delta Lake 的 MERGE 功能?>>
02、(本文)使用 Python API 在 Delta Lake 数据表上操作/大型数据湖的 Time Travel 功能
03、 轻松克隆 Delta Lake / 在 Apache Spark 上的 Delta Lake 中启用 Spark SQL 语句
第三章:Lakehouse
第四章:Streaming
第五章:客户用例
也可点击文章下方阅读原文,查看所有本系列文章~
获取更详细的 Databricks 数据洞察 相关信息,可至产品详情页查看:
https://www.aliyun.com/product/bigdata/spark
阿里巴巴开源大数据技术团队成立 Apache Spark 中国技术社区,定期推送精彩案例,技术专家直播,只为营造纯粹的 Spark 氛围,欢迎关注公众号!





