暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【译】Delta Lake 0.5.0介绍

云原生数据湖 2020-03-17
275

原文链接:https://databricks.com/blog/2020/01/29/query-delta-lake-tables-presto-athena-improved-operations-concurrency-merge-performance.html

文章转自:https://yq.aliyun.com/articles/748544?spm=a2c4e.11163080.searchblog.95.4b412ec1W8FiXs

最近,Delta Lake发布了0.5.0版本,该版本加入了对Presto和Athena的支持,以及提升了操作的并发性,本文将对Delta Lake 0.5.0版本的变化进行一个简单的介绍。

Delta Lake 0.5.0发布的几个最重要的特性如下:

  • 通过使用Manifest文件能够,支持其他数据处理引擎,现在能够使用Scala、Java、Python和SQL的API生成Manifest文件,并使用该文件通过Presto和Amazon Athena访问Delta Lake中的表数据,详细的使用方式见 https://docs.delta.io/0.5.0/presto-integration.html
  • 提升了Delta Lake所有操作的并发性,现在可以并发执行更多的Delta Lake操作。通过使用更加精细的冲突检测策略,Delta Lake的乐观并发控制得到了有效的改善,这使得我们能够更加容易地在Delta表上执行复杂的工作流。举例来说:
    1. 我们可以在给新分区添加内容的同时,在旧分区上执行delete操作;
    2. 在不相交的一组分区上同时执行update和merge操作;
    3. 让文件合并和增加文件内容同时执行

想要获取更多的信息,可以参考开源Delta Lake 0.5.0的release notes。在此博客文章中,我们将详细介绍如何使用Presto读取Delta Lake表、操作并发性的提升以及使用insert-only merge操作来更方便快速地去除重复数据。

使用Presto读取Delta Lake表

正如这篇文章《Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs》 所说,Delta Lake 的一些修改数据的操作,如delete操作,是通过给包含删除数据的文件写一个新版本,并只是将旧版本文件标记为已删除来实现的。Delta Lake采用这种方法的优势在于能够让我们查询旧版本的数据。如果我们想要了解哪些数据(或行)包含最新的数据,默认情况下我们可以去查询事务日志。其他数据处理系统,如Presto和Athena想要获取这些信息,可以通过读取Delta Lake生成的一种清单文本文件——Manifest,该文件中包含查询Delta Lake表需要读取的数据文件列表。为了实现Presto和Athena读取Delta Lake表,我们可以通过执行一些Python命令来实现,详细的内容可以参考《 Set up the Presto or Athena to Delta Lake integration and query Delta tables 》。

生成Delta Lake的Manifest文件

首先,使用以下代码片段创建Delta Lake的Manifest文件:

    deltaTable = DeltaTable.forPath(pathToDeltaTable)
    deltaTable.generate("symlink_format_manifest")

    正如代码字面意思所示,以上操作将在表根目录中生成Manifest文件。如果你根据 《Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs 》这篇文章介绍的内容创建了departureDelays
    表,将会在表根目录中产生一个新的文件夹:

      $/departureDelays.delta/_symlink_format_manifest

      该文件夹中会有一个名为manifest的文本文件。如果你查看manifest文件的内容(例如使用cat命令),你将能看到类似以下的文本内容,它们指示了包含最新快照的文件。

        file:$/departureDelays.delta/part-00003-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00006-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00002-...-c000.snappy.parquet
        file:$/departureDelays.delta/part-00007-...-c000.snappy.parquet

        创建Presto表以读取生成的Manifest文件

        接下来的步骤是在Hive Metastore中创建一个外部表,以便Presto(或Athena)可以读取上一步生成的Manifest文件,来获得需要读取的Parquet文件,以读取Delta Lake表的最新快照。需要说明的是,对于Presto,你可以使用Apache Spark或Hive CLI来运行以下命令:

          1. CREATE EXTERNAL TABLE departureDelaysExternal ( ... )
          2. ROW FORMAT SERDE
          'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
          3. STORED AS INPUTFORMAT
          4. OUTPUTFORMAT
          'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
          5. LOCATION '$/departureDelays.delta/_symlink_format_manifest'

          一些重要的说明:

          • 第一行所定义的schema必须和Delta Lake中的schema相同
          • 第五行需要指向Manifest文件的位置——_symlink_format_manifest
            Presto(或Athena)需要配置SymlinkTextInputFormat
            才能从Manifest文件中获取Parquet数据文件的列表,而不是使用目录列表中的文件。需要说明的是,如果想要使用分区表,需要按照《Configure Presto to read the generated manifests》这篇文章进行一些额外的步骤。

          更新Manifest文件

          需要注意的是,如果Delata Lake的数据有更新,都需要重新生成Manifest文件,以便Presto能够获取到最新的数据。

          操作并发性的提升

          在Delta Lake 0.5.0版本,我们能够同时执行更多的操作。通过更细粒度的冲突检测,这些最新的更新让Delta Lake能够更容易地在Delta Lake表上执行复杂的工作流,例如:

          • 可以在给新分区添加内容的同时,在旧分区上执行delete操作;
          • 追加文件内容的同时执行文件合并操作;
          • 在不相交的一组分区上同时执行update和merge操作。

          并发追加文件内容的用例

          举个例子,当我们在执行merge操作的同时,如果有并发的事务向同一个分区写入记录,Delta Lake往往会抛出ConcurrentAppendException
          异常。

            // Target 'deltaTable' is partitioned by date and country
            deltaTable.as("t").merge(
            source.as("s"),
            "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
            .whenMatched().updateAll()
            .whenNotMatched().insertAll()
            .execute()

            上面的代码段就有可能会引发冲突,因为即使表已经按照date和country进行了分区,条件仍然不够明确。问题在于,这个查询将扫描整个表,从而可能与update任何其他分区的并发操作发生冲突。通过指定specificDate
            specificCountry
            ,以便可以在特定的date和country进行merge操作,现在我们就可以安全地在不同的date和contry同时执行此操作。

              // Target 'deltaTable' is partitioned by date and country
              deltaTable.as("t").merge(
              source.as("s"),
              "s.user_id = t.user_id AND d.date = '" + specificDate + "' AND d.country = '" + specificCountry + "'")
              .whenMatched().updateAll()
              .whenNotMatched().insertAll()
              .execute()

              以上方法适用于其他所有的Delta Lake操作(如delete、更改元数据等)。

              并发文件合并

              如果你连续不断地将数据写入Delta表,随着时间的流逝,将会累积出大量的文件。这在流式数据场景中尤为重要,因为此时是以比较小的batch写入数据的,这将会导致文件系统不断地累积小文件,随着时间的推移,小文件的数量会不断增加,会降低查询的性能。优化这种场景的一个比较重要的方式就是定期获取大批量的小文件,并将其重写为数量比较小的大文件(文件合并)。过去,在同时进行数据查询和执行文件合并时,出现异常的可能性会非常高。但是现在,由于Delta Lake 0.5.0版本的优化改进,我们可以同时执行查询操作(包括流式查询)和文件的合并,并且不会有任何异常产生。举个例子来说,如果你的表已经进行了分区,并且你只想基于谓词对一个分区进行重新分区,则可以使用where来仅读取该分区,并使用replaceWhere回写该分区:

                path = "..."
                partition = "year = '2019'"
                numFilesPerPartition = 16 # Compact partition of a table to no. of files


                (spark.read
                .format("delta")
                .load(path)
                .where(partition)
                .repartition(numFilesPerPartition)
                .write
                .option("dataChange", "false")
                .format("delta")
                .mode("overwrite")
                .option("replaceWhere", partition)
                .save(path))

                以上代码中需要注意的是,仅在没有数据更改时,才使用dataChange == false选项,否则可能会破坏底层数据。

                使用Insert-only Merge操作方便快速地去除重复数据

                一个场景的ETL用例是搜集日志,并将其附加到Delta Lake表当中,一个比较常见的问题是数据源会产生重复的日志记录。通过使用Delta Lake的merge,你可以避免插入这些重复的记录,例如以下涉及merge以及update航班数据的代码:

                  # Merge merge_table with flights
                  deltaTable.alias("flights") \
                  .merge(merge_table.alias("updates"),"flights.date = updates.date") \
                  .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
                  .whenNotMatchedInsertAll() \
                  .execute()

                  在Delta Lake 0.5.0版本之前,不可能从Delta Lake表中将重复数据作为流进行读取,因为insert-only merge并不是纯粹地将数据追加到表中。例如,在流查询中,你可以在foreachBatch
                  中执行merge操作来连续不断地将流数据写入Delta Lake表当中,并将需要删除的重复数据打上标记。以下PySpark的代码展示了这个场景:

                    from delta.tables import *


                    deltaTable = DeltaTable.forPath(spark, "/data/aggregates")


                    # Function to upsert microBatchOutputDF into Delta table using merge
                    def upsertToDelta(microBatchOutputDF, batchId):
                    deltaTable.alias("t").merge(
                    microBatchOutputDF.alias("s"),
                    "s.key = t.key") \
                    .whenMatchedUpdateAll() \
                    .whenNotMatchedInsertAll() \
                    .execute()
                    }


                    # Write the output of a streaming aggregation query into Delta table
                    streamingAggregatesDF.writeStream \
                    .format("delta") \
                    .foreachBatch(upsertToDelta) \
                    .outputMode("update") \
                    .start()

                    在另一个流式查询中,你可以从该Delta Lake表中连续读取需要删除的重复数据。这是可能的,因为insert-only merge操作(在Delta Lake 0.5.0版本引入)只会将新数据追加到Delta Lake表中。

                    欢迎关注数据湖技术社区

                    数据湖开发者社区由 阿里云开发者社区 与 阿里云Data Lake Analytics团队 共同发起,致力于推广数据湖相关技术,包括hudi、delta、spark、presto、oss、元数据、存储加速、格式发现等,学习如何构建数据湖分析系统,打造适合业务的数据架构。扫描下方钉钉群二维码,加入社区一起学习讨论。

                    阿里云Data Lake Analytics是Serverless化的交互式联邦查询服务。使用标准SQL即可轻松分析与集成对象存储(OSS)、数据库(PostgreSQL/MySQL等)、NoSQL(TableStore等)数据源的数据。

                    Data Lake Analytics产品详情页:https://www.aliyun.com/product/datalakeanalytics

                    Data Lake Analytics 1元购入口:https://common-buy.aliyun.com/?commodityCode=openanalytics_post#/buy



                    文章转载自云原生数据湖,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论