1:选择合适的分区列
可以对delta lake的表进行分区存储,最常见的分区当然是时间了。请遵循以下经验法则决定要按哪个分区进行分区:# 如果列的基数(不重复数量)很高,请不要按该列进行分区。(例如:如果您按一列userid进行分区,并且可以有M个不同的用户id,则这是一个不好的分区策略)# 最好选择重复数量较多的列进行分区。比如日期。# 随着数据增长,分区不一定能适应新的数据情况,可以在空闲的时候尝试重新分区
2: 合并文件
spark.read.format("delta").load(path).repartition(numFiles).write.option("dataChange","false").format("delta").mode("overwrite").save(path)
3: 修改表的数据
val table=deltaTable.forPath(path)table.updateExpr("id=2",Map("info"->"'hello'"))修改数据的本质=remove+add
4: 删除文件
# 删除小于168小时的数据需要指定参数:spark.databricks.delta.retentionDurationCheck.enabled=falsedeltaTable.vacuum(1)# 修改全局的安全期为一个较小的值:例如在spark-defaults.conf 中设置spark.databricks.delta.properties.defaults.deletedFileRetentionDuration=1 hour默认值为10。此参数为delta log元数据的partition数量。当delta log特别大时,需要增大此值,反之减小此值的设置。该值的大小对于delta table的解析性能影响较大。
5:几个重要的参数
# spark.databricks.delta.snapshotPartitions默认值为10。此参数为delta log元数据的partition数量。当delta log特别大时,需要增大此值,反之减小此值的设置。该值的大小对于delta table的解析性能影响较大。# spark.databricks.delta.retentionDurationCheck.enabled默认值为true。清理墓碑文件时是否进行安全期检查。# spark.databricks.delta.schema.autoMerge.enabled默认值为false。Delta有校验写入数据是否符合表定义Schema的功能,用于保证写入数据的正确性。当您数据的Schema发生变更后,需要在写入数据时在option中显示指定mergeSchema为true。如果您期望当数据Schema发生变化自动进行Schema的合并,请设置该值为true。但是我们仍然建议您使用显示指定的方式,而不是让它自动合并Schema。# spark.databricks.delta.properties.defaults.deletedFileRetentionDuration或delta.deletedFileRetentionDuration默认值为interval 1 week。Delta墓碑文件的安全期。清空未超过安全期内的墓碑文件将会抛出异常(如果spark.databricks.delta.retentionDurationCheck.enabled为true的话)# spark.databricks.delta.properties.defaults.logRetentionDuration或delta.logRetentionDuration默认值为interval 30 days。Delta log文件过期时间。Delta log过期被定义为:该log文件对应的数据文件已经做过了compaction。该log文件超过了上述文件过期时间。每当Delta log进行checkpoint动作时,会检查是否有需要删除的过期文件,如果有,则删除这些过期文件以防Delta log文件无限增长。# spark.sql.sources.parallelPartitionDiscovery.parallelism默认值为1000。此参数为Delta扫描文件时所用的并行度。如果文件数量较少,则减小此值。目前仅使用在Vacuum中。如果设置不当,影响Vacuum扫描文件的效率。# deltla lake DDL操作需要添加配置如下:spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtensionspark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
6:parquent转delta 表
val dt=DeltaTable.convertToDelta(sparkSession,"parquet.`/delta/events`")dt.toDF.show()
7:delta lake集成其它存储系统参数
spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore //s3spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore //Azure storagespark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore // hdfs
8:元数据的修改
merge操作merge是delta lake的重要操作,它实现了upsert和delete功能。(示例详见基础表操作)merge最多有两个whenMatched和一个whenNotMatched,且至少有一个when;如果有两个whenMatched,则第一个whenMatched必须有条件,否则会报错。whenNotMatched可有条件,也可没有;whenMatched最多有一个更新操作和一个删除操作,whenNotMatched最多有一个删除操作,相同操作在一个when里只能有一个。match了才能删除,不支持删除没有match的数据;match了肯定只能update,没match也没法update,只能insert;merge的实现就是inner join、left anti join以及full join(详见源码分析)。option("mergeSchema",true)option("overwriteSchema",true)
9:使用时间旅行读取旧版本的数据
val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")df.show()df1.show(
10: 增删改查
# deleteimport io.delta.tables._val deltaTable = DeltaTable.forPath(spark, "/data/events/")deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted stringimport org.apache.spark.sql.functions._import spark.implicits._deltaTable.delete(col("date") < "2017-01-01")# Upsert into a table using mergeimport io.delta.tables._import org.apache.spark.sql.functions._val updatesDF = ... // define the updates DataFrame[date, eventId, data]DeltaTable.forPath(spark, "/data/events/").as("events").merge(updatesDF.as("updates"),"events.eventId = updates.eventId").whenMatched.updateExpr(Map("data" -> "updates.data")).whenNotMatched.insertExpr(Map("date" -> "updates.date","eventId" -> "updates.eventId","data" -> "updates.data")).execute()# updateimport io.delta.tables._val deltaTable = DeltaTable.forPath(spark, "/data/events/")deltaTable.updateExpr( // predicate and update expressions using SQL formatted string"eventType = 'clck'",Map("eventType" -> "'click'")import org.apache.spark.sql.functions._import spark.implicits._deltaTable.update( // predicate using Spark SQL functions and implicitscol("eventType") === "clck",Map("eventType" -> lit("click")));# 写入 Delta 表时的重复数据删除deltaTable.as("logs").merge(newDedupedLogs.as("newDedupedLogs"),"logs.uniqueId = newDedupedLogs.uniqueId").whenNotMatched().insertAll().execute()# 删除近七天的重复数据deltaTable.as("logs").merge(newDedupedLogs.as("newDedupedLogs"),"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS").whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS").insertAll().execute()# scd (缓慢变化维)写入到增量表中val customersTable: DeltaTable = ... // table with schema (customerId, address, current, effectiveDate, endDate)val updatesDF: DataFrame = ... // DataFrame with schema (customerId, address, effectiveDate)// Rows to INSERT new addresses of existing customersval newAddressesToInsert = updatesDF.as("updates").join(customersTable.toDF.as("customers"), "customerid").where("customers.current = true AND updates.address <> customers.address")// Stage the update by unioning two sets of rows// 1. Rows that will be inserted in the whenNotMatched clause// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersval stagedUpdates = newAddressesToInsert.selectExpr("NULL as mergeKey", "updates.*") // Rows for 1..union(updatesDF.selectExpr("updates.customerId as mergeKey", "*") // Rows for 2.)// Apply SCD Type 2 operation using mergecustomersTable.as("customers").merge(stagedUpdates.as("staged_updates"),"customers.customerId = mergeKey").whenMatched("customers.current = true AND customers.address <> staged_updates.address").updateExpr(Map( // Set current to false and endDate to source's effective date."current" -> "false","endDate" -> "staged_updates.effectiveDate")).whenNotMatched().insertExpr(Map("customerid" -> "staged_updates.customerId","address" -> "staged_updates.address","current" -> "true","effectiveDate" -> "staged_updates.effectiveDate", // Set current to true along with the new address and its effective date."endDate" -> "null")).execute()# 更改数据写入Delta表val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF.selectExpr("key", "struct(time, newValue, deleted) as otherCols" ).groupBy("key").agg(max("otherCols").as("latest")).selectExpr("key", "latest.*")deltaTable.as("t").merge(latestChangeForEachKey.as("s"),"s.key = t.key").whenMatched("s.deleted = true").delete().whenMatched().updateExpr(Map("key" -> "s.key", "value" -> "s.newValue")).whenNotMatched("s.deleted = false").insertExpr(Map("key" -> "s.key", "value" -> "s.newValue")).execute()# Upsert 从流式查询使用 foreachBatchimport io.delta.tables.*val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")// Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {deltaTable.as("t").merge(microBatchOutputDF.as("s"),"s.key = t.key").whenMatched().updateAll().whenNotMatched().insertAll().execute()}// Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start()# 忽略更新和删除spark.readStream.format("delta").option("ignoreDeletes", "true").load("/delta/user_events")spark.readStream.format("delta").option("ignoreChanges", "true").load("/delta/user_events")# 有条件地更新、删除和合并(更新插入)数据到表from delta.tables import *from pyspark.sql.functions import *deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")# Update every even value by adding 100 to itdeltaTable.update(condition = expr("id % 2 == 0"),set = { "id": expr("id + 100") })# Delete every even valuedeltaTable.delete(condition = expr("id % 2 == 0"))# Upsert (merge) new datanewData = spark.range(0, 20)deltaTable.alias("oldData") \.merge(newData.alias("newData"),"oldData.id = newData.id") \.whenMatchedUpdate(set = { "id": col("newData.id") }) \.whenNotMatchedInsert(values = { "id": col("newData.id") }) \.execute()deltaTable.toDF().show()
11:Spark Delta Lake 事务日志实现源码分析
Delta Lake 更新数据事务实现, 下面我们进入事务日志提交的切入口 org.apache.spark.sql.delta.OptimisticTransaction#commit
,持久化事务操作日志都是需要调用这个函数进行的。commit 函数实现如下:
def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(deltaLog,"delta.commit") {val version = try {// 事务日志提交之前需要先做一些工作,比如如果更新操作是第一次进行的,那么需要初始化 Protocol,// 还需要将用户对 Delta Lake 表的设置持久化到事务日志里面var finalActions = prepareCommit(actions, op)// 如果这次更新操作需要删除之前的文件,那么 isBlindAppend 为 false,否则为 trueval isBlindAppend = {val onlyAddFiles =finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])onlyAddFiles && !dependsOnFiles}// 如果 commitInfo.enabled 参数设置为 true,那么还需要把 commitInfo 记录到事务日志里面if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {commitInfo = CommitInfo(clock.getTimeMillis(),op.name,op.jsonEncodedValues,Map.empty,Some(readVersion).filter(_ >= 0),None,Some(isBlindAppend))finalActions = commitInfo +: finalActions}// 真正写事务日志,如果发生版本冲突会重试直到事务日志写成功val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")// 对事务日志进行 checkpoint 操作postCommit(commitVersion, finalActions)commitVersion} catch {case e: DeltaConcurrentModificationException =>recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)throw ecase NonFatal(e) =>recordDeltaEvent(deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))throw e}version}我们先从这个函数的两个参数开始介绍。_actions: Seq[Action]_:Delta Lake 表更新操作产生的新文件(AddFile)和需要删除文件的列表(RemoveFile)。如果是 Structured Streaming 作业,还会记录 SetTransaction 记录,里面会存储作业的 query id(sql.streaming.queryId)、batchId 以及当前时间。这个就是我们需要持久化到事务日志里面的数据。_op: DeltaOperations.Operation_:Delta 操作类型,比如 WRITE、STREAMING UPDATE、DELETE、MERGE 以及 UPDATE 等。在 commit 函数里面主要分为三步:prepareCommit、doCommit 以及 postCommit。prepareCommit 的实现如下:protected def prepareCommit(actions: Seq[Action],op: DeltaOperations.Operation): Seq[Action] = {assert(!committed, "Transaction already committed.")// 如果我们更新了表的 Metadata 信息,那么需要将其写入到事务日志里面var finalActions = newMetadata.toSeq ++ actionsval metadataChanges = finalActions.collect { case m: Metadata => m }assert(metadataChanges.length <= 1,"Cannot change the metadata more than once in a transaction.")metadataChanges.foreach(m => verifyNewMetadata(m))// 首次提交事务日志,那么会确保 _delta_log 目录要存在,// 然后检查 finalActions 里面是否有 Protocol,没有的话需要初始化协议版本if (snapshot.version == -1) {deltaLog.ensureLogDirectoryExist()if (!finalActions.exists(_.isInstanceOf[Protocol])) {finalActions = Protocol() +: finalActions}}finalActions = finalActions.map {// 第一次提交,并且是 Metadata那么会将 Delta Lake 的配置信息加入到 Metadata 里面case m: Metadata if snapshot.version == -1 =>val updatedConf = DeltaConfigs.mergeGlobalConfigs(spark.sessionState.conf, m.configuration, Protocol())m.copy(configuration = updatedConf)case other => other}deltaLog.protocolWrite(snapshot.protocol,logUpgradeMessage = !actions.headOption.exists(_.isInstanceOf[Protocol]))// 如果 actions 里面有删除的文件,那么需要检查 Delta Lake 是否支持删除val removes = actions.collect { case r: RemoveFile => }if (removes.exists(_.dataChange)) deltaLog.assertRemovable()finalActions}
private def doCommit(attemptVersion: Long,actions: Seq[Action],attemptNumber: Int): Long = deltaLog.lockInterruptibly {try {logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions")// 真正写事务日志的操作deltaLog.store.write(deltaFile(deltaLog.logPath, attemptVersion),actions.map(_.json).toIterator)val commitTime = System.nanoTime()// 由于发生了数据更新,所以更新内存中事务日志的最新快照,并做相关判断val postCommitSnapshot = deltaLog.update()if (postCommitSnapshot.version < attemptVersion) {throw new IllegalStateException(s"The committed version is $attemptVersion " +s"but the current version is ${postCommitSnapshot.version}.")}// 发送一些统计信息var numAbsolutePaths = 0var pathHolder: Path = nullval distinctPartitions = new mutable.HashSet[Map[String, String]]val adds = actions.collect {case a: AddFile =>pathHolder = new Path(new URI(a.path))if (pathHolder.isAbsolute) numAbsolutePaths += 1distinctPartitions += a.partitionValuesa}val stats = CommitStats(startVersion = snapshot.version,commitVersion = attemptVersion,readVersion = postCommitSnapshot.version,txnDurationMs = NANOSECONDS.toMillis(commitTime - txnStartNano),commitDurationMs = NANOSECONDS.toMillis(commitTime - commitStartNano),numAdd = adds.size,numRemove = actions.collect { case r: RemoveFile => r }.size,bytesNew = adds.filter(_.dataChange).map(_.size).sum,numFilesTotal = postCommitSnapshot.numOfFiles,sizeInBytesTotal = postCommitSnapshot.sizeInBytes,protocol = postCommitSnapshot.protocol,info = Option(commitInfo).map(_.copy(readVersion = None, isolationLevel = None)).orNull,newMetadata = newMetadata,numAbsolutePaths,numDistinctPartitionsInAdd = distinctPartitions.size,isolationLevel = null)recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats)attemptVersion} catch {case e: java.nio.file.FileAlreadyExistsException =>checkAndRetry(attemptVersion, actions, attemptNumber)}}
这里就是真正写事务日志的操作,其中 store 是通过
spark.delta.logStore.class
参数指定的,目前支持 HDFS、S3、Azure 以及 Local 等存储介质。默认是 HDFS。具体的写事务操作参见下面的介绍。持久化事务日志之后,更新内存中的事务日志最新的快照,然后做相关的合法性校验;
发送一些统计信息。这里应该是 databricks 里面含有的功能,开源版本这里面其实并没有做什么操作。
下面我们开看看真正写事务日志的实现,为了简单起见,我们直接查看 HDFSLogStore 类中对应的方法,主要涉及
writeInternal,其实现如下:
private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = {// 获取 HDFS 的 FileContext 用于后面写事务日志val fc = getFileContext(path)// 如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试if (!overwrite && fc.util.exists(path)) {// This is needed for the tests to throw error with local file systemthrow new FileAlreadyExistsException(path.toString)}// 事务日志先写到临时文件val tempPath = createTempPath(path)var streamClosed = false // This flag is to avoid double closevar renameDone = false // This flag is to save the delete operation in most of cases.val stream = fc.create(tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))try {// 将本次修改产生的 actions 写入到临时事务日志里actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)stream.close()streamClosed = truetry {val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE// 将临时的事务日志移到正式的事务日志里面,如果移动失败则抛出异常,后面再重试fc.rename(tempPath, path, renameOpt)renameDone = true} catch {case e: org.apache.hadoop.fs.FileAlreadyExistsException =>throw new FileAlreadyExistsException(path.toString)}} finally {if (!streamClosed) {stream.close()}// 删除临时事务日志if (!renameDone) {fc.delete(tempPath, false)}}}
writeInternal 的实现逻辑很简单,其实就是我们正常的写文件操作,具体如下:
获取 HDFS 的 FileContext 用于后面写事务日志
如果需要写的事务日志已经存在那么就需要抛出异常,后面再重试;比如上面我们写事务日志之前磁盘中最新的事务日志文件是 00000000000000000003.json,我们这次写的事务日志文件应该是 00000000000000000004.json,但是由于 Delta Lake 允许多个用户写数据,所以在我们获取最新的事务日志版本到写事务日志期间已经有用户写了一个新的事务日志 00000000000000000004.json,那么我们这次写肯定要失败了。这时候会抛出 FileAlreadyExistsException 异常,以便后面重试。
写事务日志的时候是先写到表 _delta_lake 目录下的临时文件里面,比如我们这次写的事务日志文件为 00000000000000000004.json,那么会往类似于 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 文件里面写数据的。
将本次更新操作的事务记录写到临时文件里;
写完事务日志之后我们需要将临时事务日志移到最后正式的日志文件里面,比如将 .00000000000000000004.json.0887f7da-5920-4214-bd2e-7c14b4244af1.tmp 移到 00000000000000000004.json。大家注意,在写事务日志文件的过程中同样存在多个用户修改表,所以 00000000000000000004.json 文件很可能已经被别的修改占用了,这时候也需要抛出 FileAlreadyExistsException 异常,以便后面重试。
整个事务日志写操作就完成了,我们再回到 doCommit 函数,注意由于 writeInternal 可能会抛出 FileAlreadyExistsException 异常,也就是 deltaLog.store.write(xxx) 调用可能会抛出异常,我们注意看到 doCommit 函数 catch 了这个异常,并在异常捕获里面调用
checkAndRetry(attemptVersion, actions, attemptNumber),这就是事务日志重写过程, checkAndRetry 函数的实现如下:
protected def checkAndRetry(checkVersion: Long,actions: Seq[Action],attemptNumber: Int): Long = recordDeltaOperation(deltaLog,"delta.commit.retry",tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) {// 读取磁盘中持久化的事务日志,并更新内存中事务日志快照deltaLog.update()// 重试的版本是刚刚更新内存中事务日志快照的版本+1val nextAttempt = deltaLog.snapshot.version + 1// 做相关的合法性判断(checkVersion until nextAttempt).foreach { version =>val winningCommitActions =deltaLog.store.read(deltaFile(deltaLog.logPath, version)).map(Action.fromJson)val metadataUpdates = winningCommitActions.collect { case a: Metadata => a }val txns = winningCommitActions.collect { case a: SetTransaction => a }val protocol = winningCommitActions.collect { case a: Protocol => a }val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }.map(ci => ci.copy(version = Some(version)))val fileActions = winningCommitActions.collect { case f: FileAction => f }// If the log protocol version was upgraded, make sure we are still okay.// Fail the transaction if we're trying to upgrade protocol ourselves.if (protocol.nonEmpty) {protocol.foreach { p =>deltaLog.protocolRead(p)deltaLog.protocolWrite(p)}actions.foreach {case Protocol(_, _) => throw new ProtocolChangedException(commitInfo)case _ =>}}// Fail if the metadata is different than what the txn read.if (metadataUpdates.nonEmpty) {throw new MetadataChangedException(commitInfo)}// Fail if the data is different than what the txn read.if (dependsOnFiles && fileActions.nonEmpty) {throw new ConcurrentWriteException(commitInfo)}// Fail if idempotent transactions have conflicted.val txnOverlap = txns.map(_.appId).toSet intersect readTxn.toSetif (txnOverlap.nonEmpty) {throw new ConcurrentTransactionException(commitInfo)}}logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.")// 开始重试事务日志的写操作doCommit(nextAttempt, actions, attemptNumber + 1)}
checkAndRetry
函数只有在事务日志写冲突的时候才会出现,主要目的是重写当前的事务日志。
因为上次更新事务日志发生冲突,所以我们需要再一次读取磁盘中持久化的事务日志,并更新内存中事务日志快照;
重试的版本是刚刚更新内存中事务日志快照的版本+1;
做相关的合法性判断;
开始重试事务日志的写操作。
当事务日志成功持久化到磁盘之后,这时候会执行 commit 操作的最后一步,执行 postCommit
函数,其实现如下:
protected def postCommit(commitVersion: Long, commitActions: Seq[Action]): Unit = {committed = trueif (commitVersion != 0 && commitVersion % deltaLog.checkpointInterval == 0) {try {deltaLog.checkpoint()} catch {case e: IllegalStateException =>logWarning("Failed to checkpoint table state.", e)}}}




