暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hudi Spark SQL源码学习总结-CTAS

伦少的博客 2022-08-02
736

前言

上一篇文章Hudi Spark SQL源码学习总结-Create Table总结了Create Table
的源码执行逻辑,这一篇继续总结CTAS
,之所以总结CTAS
,是之前在我提交的一个PR中发现,Spark2和Spark3.2.1版本的CTAS
的逻辑不一样,最终走的Hudi实现类也不一样,所以本文分Spark2和Spark3.2.1两个版本分析

不同点

先总结一下Spark2和Spark3.2.1的整体逻辑的不同点

Spark2:     visitCreateTable
->CreateTable
->CreateHoodieTableAsSelectCommand.run

Spark3.2.1: 前提配置了:spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
,如果没有配置则和Spark2一样
           visitCreateTable
->CreateTableAsSelectStatement
->isV2Provider
->true->CreateTableAsSelect
->HoodieCatalog.createHoodieTable

           visitCreateTable
->CreateTableAsSelectStatement
->isV2Provider
->false->CreateTable
->CreateHoodieTableAsSelectCommand.run

Spark2和Spark3.2.1不同的关键点有两个:

  • 1、配置spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog

  • 2、isV2Provider("hudi")返回ture

只要有一个不满足,Spark3.2.1的逻辑就和Spark2一样,引进HoodieCatalog
和令hudi
为V2Provider的PR为:https://github.com/apache/hudi/pull/4611
目前master最新代码已将spark3.2.1的isV2Provider("hudi")改为了false,也就是Spark2和Saprk3.2.1的逻辑又一致了,PR:https://github.com/apache/hudi/pull/5737

版本

Hudi https://github.com/apache/hudi/pull/5592 本文基于这个PR对应的代码进行调试分析,因为我就是在贡献这个PR时才发现Spark3.2.1和Saprk2的CTAS
的逻辑不同的

示例代码

还是直接拿源码里的TestCreateTable
的测试语句

spark.sql(
        s"""
           | create table $tableName using hudi
           | partitioned by (dt)
           | tblproperties(
           |    hoodie.database.name = "
databaseName",
           |    hoodie.table.name = "
tableName",
           |    primaryKey = 'id',
           |    preCombineField = 'ts',
           |    hoodie.datasource.write.operation = 'upsert',
           |    type = '$tableType'
           | )
           | AS
           | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts
         "
"".stripMargin
      )

不过需要提一下, 这里的spark是如何创建的,因为在分析Spark3.2.1的逻辑时会用到,先贴在这里:

  protected lazy val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("hoodie sql test")
    .withExtensions(new HoodieSparkSessionExtension)
    .config("spark.serializer""org.apache.spark.serializer.KryoSerializer")
    .config("hoodie.insert.shuffle.parallelism""4")
    .config("hoodie.upsert.shuffle.parallelism""4")
    .config("hoodie.delete.shuffle.parallelism""4")
    .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
    .config("spark.sql.session.timeZone""CTT")
    .config(sparkConf())
    .getOrCreate()

   def sparkConf(): SparkConf = {
    val sparkConf = new SparkConf()
    if (HoodieSparkUtils.gteqSpark3_2) {
      sparkConf.set("spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    }
    sparkConf
  }   

打印执行计划

和上篇文章一样我们先打印一下计划,方便我们分析

config("spark.sql.planChangeLog.level""INFO")
val df = spark.sql(ctasSql)
df.explain(true)

和上一篇文章的不同点是加了配置"spark.sql.planChangeLog.level", "INFO"
,之所以上篇文章不加这篇文章加,是因为这个配置在Spark3.1.0才有得,所以对于Spark2的代码不生效,不过在我们分析Spark3.2.1的执行计划会比较有用,另外提一下,开启这个配置是通过logBasedOnLevel(message)
来打印信息的,一共有三个方法调用了logBasedOnLevel
,分别为logRule
:如果rule生效,打印oldPlan
newPlan
logBatch
:打印Batch
的前后信息,logMetrics
:打印整体指标,但是在planner.plan
中没有调用这几个方法,所以对于分析哪些strategies
会生效是没用的,不过对于分析analysis
阶段的哪些规则会生效还是非常有用的

  private def logBasedOnLevel(f: => String): Unit = {
    logLevel match {
      case "TRACE" => logTrace(f)
      case "DEBUG" => logDebug(f)
      case "INFO" => logInfo(f)
      case "WARN" => logWarning(f)
      case "ERROR" => logError(f)
      case _ => logTrace(f)
    }
  }
  def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
    if (!newPlan.fastEquals(oldPlan)) {
      if (logRules.isEmpty || logRules.get.contains(ruleName)) {
        def message(): String = {
          s"""
             |=== Applying Rule $ruleName ===
             |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("
\n")}
           "
"".stripMargin
        }

        logBasedOnLevel(message)
      }
    }
  }
  def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
    if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
      def message(): String = {
        if (!oldPlan.fastEquals(newPlan)) {
          s"""
             |=== Result of Batch $batchName ===
             |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("
\n")}
          "
"".stripMargin
        } else {
          s"Batch $batchName has no effect."
        }
      }

      logBasedOnLevel(message)
    }
  }
  def logMetrics(metrics: QueryExecutionMetrics): Unit = {
    val totalTime = metrics.time / NANOS_PER_SECOND.toDouble
    val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble
    val message =
      s"""
         |=== Metrics of Executed Rules ===
         |Total number of runs: ${metrics.numRuns}
         |Total time: $totalTime seconds
         |Total number of effective runs: ${metrics.numEffectiveRuns}
         |Total time of effective runs: $totalTimeEffective seconds
      "
"".stripMargin

    logBasedOnLevel(message)
  }  

Spark2

Spark2的逻辑和上一篇文章差不多,由于上一篇已经总结过了,所以本文只讲不同的地方,如果掌握了上一篇文章的逻辑的话,再看CTAS
的逻辑还是比较简单的。

打印信息

== Parsed Logical Plan ==
'CreateTable `h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
   +- OneRowRelation

== Analyzed Logical Plan ==
CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
      +- OneRowRelation

== Optimized Logical Plan ==
CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
      +- OneRowRelation

== Physical Plan ==
Execute CreateHoodieTableAsSelectCommand
   +- CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
         +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
            +- OneRowRelation

singleStatement

根据上篇文章中的逻辑,可知这里的CTAS
语句同样对应Spark源码里的SqlBase.g4

singleStatement
    : statement EOF
    ;

statement
    : query                                                            #statementDefault
    | USE db=identifier                                                #use
    | CREATE DATABASE (IF NOT EXISTS)? identifier
        (COMMENT comment=STRING)? locationSpec?
        (WITH DBPROPERTIES tablePropertyList)?                         #createDatabase
    | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList     #setDatabaseProperties
    | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)?      #dropDatabase
    | createTableHeader ('(' colTypeList ')')? tableProvider
        ((OPTIONS options=tablePropertyList) |
        (PARTITIONED BY partitionColumnNames=identifierList) |
        bucketSpec |
        locationSpec |
        (COMMENT comment=STRING) |
        (TBLPROPERTIES tableProps=tablePropertyList))*
        (ASquery)?                                                   #createTable
    | createTableHeader ('(' columns=colTypeList ')')?
        ((COMMENT comment=STRING) |
        (PARTITIONED BY '(' partitionColumns=colTypeList ')') |
        bucketSpec |
        skewSpec |
        rowFormat |
        createFileFormat |
        locationSpec |
        (TBLPROPERTIES tableProps=tablePropertyList))*
        (ASquery)?                                                   #createHiveTable
        ......

tableProvider
    : USING qualifiedName
    ;

不过这里有点不同的是:query不为空 (AS? query)  ,所以在visitCreateTable
中返回CreateTable(tableDesc, mode, Some(query))

  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
    val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
    if (external) {
      operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
    }

    checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
    checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
    checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
    checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
    checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
    checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

    val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
    // provider为hudi
    val provider = ctx.tableProvider.qualifiedName.getText
    val schema = Option(ctx.colTypeList()).map(createSchema)
    val partitionColumnNames =
      Option(ctx.partitionColumnNames)
        .map(visitIdentifierList(_).toArray)
        .getOrElse(Array.empty[String])
    val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
    val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)

    val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
    val storage = DataSource.buildStorageFormatFromOptions(options)

    if (location.isDefined && storage.locationUri.isDefined) {
      throw new ParseException(
        "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
          "you can only specify one of them.", ctx)
    }
    val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI))

    val tableType = if (customLocation.isDefined) {
      CatalogTableType.EXTERNAL
    } else {
      CatalogTableType.MANAGED
    }

    val tableDesc = CatalogTable(
      identifier = table,
      tableType = tableType,
      storage = storage.copy(locationUri = customLocation),
      schema = schema.getOrElse(new StructType),
      provider = Some(provider),
      partitionColumnNames = partitionColumnNames,
      bucketSpec = bucketSpec,
      properties = properties,
      comment = Option(ctx.comment).map(string))

    // Determine the storage mode.
    val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

    if (ctx.query != null) 
{
      // Get the backing query.
      val query = plan(ctx.query)

      if (temp) {
        operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
      }

      // Don't allow explicit specification of schema for CTAS
      if (schema.nonEmpty) {
        operationNotAllowed(
          "Schema may not be specified in a Create Table As Select (CTAS) statement",
          ctx)
      }
      CreateTable(tableDesc, mode, Some(query))
    } else {
      if (temp) {
        if (ifNotExists) {
          operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
        }

        logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
          "CREATE TEMPORARY VIEW ... USING ... instead")
        // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support
        // IF NOT EXISTS. Users are not allowed to replace the existing temp table.
        CreateTempViewUsing(table, schema, replace = false, global = false, provider, options)
      } else {
        CreateTable(tableDesc, mode, None)
      }
    }
  }

analysis

那么在analysis
阶段中Hudi的自定义规则customResolutionRules
中的HoodieAnalysis
的apply方法中会被匹配到

case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
  with SparkAdapterSupport 
{

  override def apply(plan: LogicalPlan): LogicalPlan = {
    plan match {
      // Convert to MergeIntoHoodieTableCommand
      case m @ MergeIntoTable(target, _, _, _, _)
        if m.resolved && sparkAdapter.isHoodieTable(target, sparkSession) =>
          MergeIntoHoodieTableCommand(m)

      // Convert to UpdateHoodieTableCommand
      case u @ UpdateTable(table, _, _)
        if u.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
          UpdateHoodieTableCommand(u)

      // Convert to DeleteHoodieTableCommand
      case d @ DeleteFromTable(table, _)
        if d.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
          DeleteHoodieTableCommand(d)

      // Convert to InsertIntoHoodieTableCommand
      case l if sparkAdapter.isInsertInto(l) =>
        val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get
        table match {
          case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) =>
            new InsertIntoHoodieTableCommand(relation, query, partition, overwrite)
          case _ =>
            l
        }

      // Convert to CreateHoodieTableAsSelectCommand
      case CreateTable(table, mode, Some(query))
        if query.resolved && sparkAdapter.isHoodieTable(table) 
=>
          CreateHoodieTableAsSelectCommand(table, mode, query)

      // Convert to CompactionHoodieTableCommand
      case CompactionTable(table, operation, options)
        if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) 
=>
        val tableId = getTableIdentifier(table)
        val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
        CompactionHoodieTableCommand(catalogTable, operation, options)
      // Convert to CompactionHoodiePathCommand
      case CompactionPath(path, operation, options) =>
        CompactionHoodiePathCommand(path, operation, options)
      // Convert to CompactionShowOnTable
      case CompactionShowOnTable(table, limit)
        if sparkAdapter.isHoodieTable(table, sparkSession) 
=>
        val tableId = getTableIdentifier(table)
        val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
        CompactionShowHoodieTableCommand(catalogTable, limit)
      // Convert to CompactionShowHoodiePathCommand
      case CompactionShowOnPath(path, limit) =>
        CompactionShowHoodiePathCommand(path, limit)
      // Convert to HoodieCallProcedureCommand
      case c@CallCommand(_, _) =>
        val procedure: Option[Procedure] = loadProcedure(c.name)
        val input = buildProcedureArgs(c.args)
        if (procedure.nonEmpty) {
          CallProcedureHoodieCommand(procedure.get, input)
        } else {
          c
        }
      case _ => plan
    }
  }

这里转化为CreateHoodieTableAsSelectCommand
,它和CreateHoodieTableCommand
一样是是Command
的子类

case class CreateHoodieTableAsSelectCommand(
   table: CatalogTable,
   mode: SaveMode,
   query: LogicalPlan)
 extends HoodieLeafRunnableCommand 
{
  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

  override def run(sparkSession: SparkSession): Seq[Row] = {
    assert(table.tableType != CatalogTableType.VIEW)
    assert(table.provider.isDefined)

    val sessionState = sparkSession.sessionState
    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
    val tableIdentWithDB = table.identifier.copy(database = Some(db))
    val tableName = tableIdentWithDB.unquotedString

    if (sessionState.catalog.tableExists(tableIdentWithDB)
{
      assert(mode != SaveMode.Overwrite,
        s"Expect the table $tableName has been dropped when the save mode is Overwrite")

      if (mode == SaveMode.ErrorIfExists) {
        throw new RuntimeException(s"Table $tableName already exists. You need to drop it first.")
      }
      if (mode == SaveMode.Ignore) {
        // Since the table already exists and the save mode is Ignore, we will just return.
        // scalastyle:off
        return Seq.empty
        // scalastyle:on
      }
    }
    ......

所以会最终会调用ExecutedCommandExec.executeCollect
,触发CreateHoodieTableAsSelectCommand
重写的run方法,实现Hudi自己的逻辑,Hudi自己的逻辑可以在Hudi源码里调试跟踪,本文就不总结了。

Spark3.2.1

Hudi支持不同的Spark版本,默认是Spark2.4.4,要想使用Spark3.2.1版本,可以通过如下命令编译打包:

mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12

要想调试Spark3.2.1,可以根据上面的命令先打包或者install到本地,再新建一个测试项目引用我们自己打的包用来调试,也可以直接在Hudi源码里修改配置Idea Spark3.2.1的环境,不过比较麻烦,本人用的第二种方法,同样的这里也只讲关键的不同点

打印信息

Spark3.2.1的打印信息会比Spark2更全一点,我们可以看到最终的Physical Plan
AtomicCreateTableAsSelect
HoodieCatalog
有关

== Parsed Logical Plan ==
'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
   +- OneRowRelation

== Analyzed Logical Plan ==
CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
   +- OneRowRelation

== Optimized Logical Plan ==
CommandResult AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
   +- CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
      +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
         +- OneRowRelation

== Physical Plan ==
CommandResult <empty>
   +- AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
      +- *(1) Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
         +- *(1) Scan OneRowRelation[]

PlanChangeLogger的日志比较多,会打印哪些规则没有生效,哪些规则生效了,具体怎么生效的等,由于比较多,这里只贴一小部分

7720 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Substitution has no effect.
7721 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Disable Hints has no effect.
7724 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Hints has no effect.
7728 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Simple Sanity Check has no effect.
8309 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - 
=== Applying Rule org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog ===
!'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false   CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
 +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]                                                                                                                                   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
    +- OneRowRelation                                                                                                                                                                                                       +- OneRowRelation

8331 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - 
=== Result of Batch Resolution ===
!'
CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, falsefalse   CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
 +- Project [1 AS id#0, a1 AS name#110 AS price#22021-04-01 AS dt#31000 AS ts#4]                                                                                                                                   +- Project [1 AS id#0, a1 AS name#110 AS price#22021-04-01 AS dt#31000 AS ts#4]
    +- OneRowRelation                                                                                                                                                                                                       +- OneRowRelation

8334 [ScalaTest-run-running-TestCreateTable] INFO  org.apache.spark.sql.catalyst.rules.PlanChangeLogger  - Batch Remove TempResolvedColumn has no effect.
......
=== Metrics of Executed Rules ===
Total number of runs: 141
Total time: 0.6459626 seconds
Total number of effective runs: 2
Total time of effective runs: 0.302878 seconds 

由于比较长,导致换行,oldPan和newPlan的对比效果不是很明显,不过可以大概看出来前后变化就行,也可以自己调试对比

ANTLR

上一篇讲到Hudi有三个g4
文件,一个在hudi-spark模块下,另外两个在hudi-spark2模块下,同样的在hudi-spark3模块下也有两个同名的g4
文件,不过内容和Spark2的不一样,具体为:

  • hudi-spark模块下的 HoodieSqlCommon.g4

  • hudi-spark3模块下的 SqlBase.g4
    ,拷贝自的Spark3.2.0源码里的SqlBase.g4

  • hudi-spark3模块下的 HoodieSqlBase.g4
     其中导入了上面的SqlBase.g4

HoodieSqlBase.g4

grammar HoodieSqlBase;

import SqlBase;

singleStatement
    : statement EOF
    ;

statement
    : query                                                            #queryStatement
    | ctes? dmlStatementNoWith                                         #dmlStatement
    | createTableHeader ('(' colTypeList ')')? tableProvider?
        createTableClauses
        (AS? query)?                                                   #createTable
    | .*?                                                              #passThrough
    ;

parsing

同样的parsePlan
首先调用HoodieCommonSqlParser.parsePlan
,这个是公共的,和Spark2一样,返回null,调用sparkExtendedParser.parsePlan

  private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate)
  private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
    .map(_(session, delegate)).getOrElse(delegate)

  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    builder.visit(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _=> sparkExtendedParser.parsePlan(sqlText)
    }
  }

上一篇讲到Spark2的sparkExtendedParser
HoodieSpark2ExtendedSqlParser
,那么Spark3的是啥呢?很简单和Spark2的逻辑一样,来看一下:

  private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
    .map(_(session, delegate)).getOrElse(delegate)

  lazy val sparkAdapter: SparkAdapter = {
    val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
      "org.apache.spark.sql.adapter.Spark3_2Adapter"
    } else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
      "org.apache.spark.sql.adapter.Spark3_1Adapter"
    } else {
      "org.apache.spark.sql.adapter.Spark2Adapter"
    }
    getClass.getClassLoader.loadClass(adapterClass)
      .newInstance().asInstanceOf[SparkAdapter]
  }

根据上面的代码可知sparkAdapter
Spark3_2Adapter
,接着看一下Spark3_2Adapter.createExtendedSparkParser

  override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
    Some(
      (spark: SparkSession, delegate: ParserInterface) => new HoodieSpark3_2ExtendedSqlParser(spark, delegate)
    )
  }

所以这里的Spark3的sparkExtendedParser
HoodieSpark3_2ExtendedSqlParser
,接着到了HoodieSpark3_2ExtendedSqlParser.parsePlan
,这里和Spark2的逻辑不一样

  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    builder.visit(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _=> delegate.parsePlan(sqlText)
    }
  }

  protected def parse[T](command: String)(toResult: HoodieSqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")

    val lexer = new HoodieSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new HoodieSqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
//    parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
    parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
    parser.SQL_standard_keyword_behavior = conf.ansiEnabled

    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
      }
      catch {
        case e: ParseCancellationException =>
          // if we fail, parse with LL mode
          tokenStream.seek(0// rewind input stream
          parser.reset()

          // Try Again.
          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
          toResult(parser)
      }
    }
    catch {
      case e: ParseException if e.command.isDefined =>
        throw e
      case e: ParseException =>
        throw e.withCommand(command)
      case e: AnalysisException =>
        val position = Origin(e.line, e.startPosition)
        throw new ParseException(Option(command), e.message, position, position)
    }
  }

可以看到这里parser
同样为HoodieSqlBaseParser
,但是对于builder.visit(parser.singleStatement())
,Spark3和Spark2是不一样的,为啥不一样?我们接着往下看:
我们在Spark3的HoodieSqlBase.g4
中可以看到statement
中是有#createTable

    | createTableHeader ('(' colTypeList ')')? tableProvider?
        createTableClauses
        (AS? query)?                                                   #createTable

其中 createTableHeader
tableProvider
AS
query
都是引自SqlBase.g4
,所以这里的CTAS
能匹配上,这里的parser.singleStatement()
和之前讲的一样,最终都会调用builder.visitCreateTable
,不同的是,这里的builder
HoodieSpark3_2ExtendedSqlAstBuilder
,所以需要看一下它的visitCreateTable
有何不同

  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
    val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)

    val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
    val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
    val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
      visitCreateTableClauses(ctx.createTableClauses())

    if (provider.isDefined && serdeInfo.isDefined) {
      operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
    }

    if (temp) {
      val asSelect = if (ctx.query == null"" else " AS ..."
      operationNotAllowed(
        s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx)
    }

    val partitioning = partitionExpressions(partTransforms, partCols, ctx)

    Option(ctx.query).map(plan) match {
      case Some(_) if columns.nonEmpty =>
        operationNotAllowed(
          "Schema may not be specified in a Create Table As Select (CTAS) statement",
          ctx)

      case Some(_) if partCols.nonEmpty =>
        // non-reference partition columns are not allowed because schema can't be specified
        operationNotAllowed(
          "Partition column types may not be specified in Create Table As Select (CTAS)",
          ctx)

      case Some(query) =>
        CreateTableAsSelectStatement(
          table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
          writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)

      case _ =>
        // Note: table schema includes both the table columns list and the partition columns
        // with data type.
        val schema = StructType(columns ++ partCols)
        CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
          options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
    }
  }

这里会匹配到case Some(query)
,返回CreateTableAsSelectStatement
,这就是和Spark2(或者说Spark源码里的visitCreateTable)不同之处,Spark2返回的是CreateTable(tableDesc, mode, Some(query))
,那么又是在哪里对CreateTableAsSelectStatement
进行处理的呢

ResolveCatalogs 和 ResolveSessionCatalog

有两个规则类会匹配CreateTableAsSelectStatement
,ResolveCatalogs
是在Analyzer
batches
中定义的,ResolveSessionCatalog
是在BaseSessionStateBuilder.analyzer
重写的的extendedResolutionRules
中定义的(我们在PlanChangeLogger的日志中可知,真正起作用的是ResolveSessionCatalog

override def batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
      // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
      // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
      // at the beginning of analysis.
      OptimizeUpdateFields,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      SubstituteUnresolvedOrdinals),
    Batch("Disable Hints", Once,
      new ResolveHints.DisableHints),
    Batch("Hints", fixedPoint,
      ResolveHints.ResolveJoinStrategyHints,
      ResolveHints.ResolveCoalesceHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions(v1SessionCatalog) ::
      ResolveNamespace(catalogManager) ::
      new ResolveCatalogs(catalogManager) ::
      ResolveUserSpecifiedColumns ::
      ResolveInsertInto ::
      ResolveRelations ::
      ......
      extendedResolutionRules : _*),
      .......

  protected def analyzer: Analyzer = new Analyzer(catalogManager) {
    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
      new FindDataSourceTable(session) +:
        new ResolveSQLOnFile(session) +:
        new FallBackFileSourceV2(session) +:
        ResolveEncodersInScalaAgg +:
        new ResolveSessionCatalog(catalogManager) +:
        ResolveWriteToStream +:
        customResolutionRules

    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
      DetectAmbiguousSelfJoin +:
        PreprocessTableCreation(session) +:
        PreprocessTableInsertion +:
        DataSourceAnalysis +:
        customPostHocResolutionRules

    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
      PreWriteCheck +:
        PreReadCheck +:
        HiveOnlyCheck +:
        TableCapabilityCheck +:
        CommandCheck +:
        customCheckRules
  }

他们两个的apply方法分别为:

class ResolveCatalogs(val catalogManagerCatalogManager)
  extends Rule[LogicalPlanwith LookupCatalog 
{
  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
  import org.apache.spark.sql.connector.catalog.CatalogV2Util._

  override def apply(plan: LogicalPlan): LogicalPlan 
= plan resolveOperators {
    case c @ CreateTableStatement(
         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
      CreateV2Table(
        catalog.asTableCatalog,
        tbl.asIdentifier,
        c.tableSchema,
        // convert the bucket spec and add it as a transform
        c.partitioning ++ c.bucketSpec.map(_.asTransform),
        convertTableProperties(c),
        ignoreIfExists = c.ifNotExists)

    case c @ CreateTableAsSelectStatement(
         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
      CreateTableAsSelect(
        catalog.asTableCatalog,
        tbl.asIdentifier,
        // convert the bucket spec and add it as a transform
        c.partitioning ++ c.bucketSpec.map(_.asTransform),
        c.asSelect,
        convertTableProperties(c),
        writeOptions = c.writeOptions,
        ignoreIfExists = c.ifNotExists)

class ResolveSessionCatalog(val catalogManagerCatalogManager)
  extends Rule[LogicalPlanwith LookupCatalog 
{
  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
  import org.apache.spark.sql.connector.catalog.CatalogV2Util._
  import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._

  override def apply(plan: LogicalPlan): LogicalPlan 
= plan.resolveOperatorsUp {
    case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
      cols.foreach { c =>
        assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
        if (!c.nullable) {
          throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
        }
      }
      AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))

    ......

    case c @ CreateTableAsSelectStatement(
         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
      val (storageFormat, provider) = getStorageFormatAndProvider(
        c.provider, c.options, c.location, c.serde, ctas = true)
      if (!isV2Provider(provider)) {
        val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
          c.partitioning, c.bucketSpec, c.properties, provider, c.location,
          c.comment, storageFormat, c.external)
        val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
        CreateTable(tableDesc, mode, Some(c.asSelect))
      } else 
{
        CreateTableAsSelect(
          catalog.asTableCatalog,
          tbl.asIdentifier,
          // convert the bucket spec and add it as a transform
          c.partitioning ++ c.bucketSpec.map(_.asTransform),
          c.asSelect,
          convertTableProperties(c),
          writeOptions = c.writeOptions,
          ignoreIfExists = c.ifNotExists)
      }
    ......

可以看到这两个规则都试图去匹配CreateTableAsSelectStatement
,区别是一个匹配NonSessionCatalogAndTable
,另一个匹配SessionCatalogAndTable
,根据名字判断,他们两个的判断是反过来的,总有一个会匹配上,那么这俩具体实现是啥呢,我看以SessionCatalogAndTable
为例,我们知道scala在匹配样例类对象时回去调用它的unapply
方法,这里的参数nameParts
CreateTableAsSelectStatement
的第一个参数,在上面的visitCreateTable
可知,它是由visitCreateTableHeader(ctx.createTableHeader)
返回的table

  object SessionCatalogAndTable {
    def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])= nameParts match {
      case SessionCatalogAndIdentifier(catalog, ident) =>
        Some(catalog -> ident.asMultipartIdentifier)
      case _ => None
    }
  }

  object SessionCatalogAndIdentifier {

    def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)= parts match {
      case CatalogAndIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
        Some(catalog, ident)
      case _ => None
    }
  }

    object CatalogAndIdentifier {
    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

    private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)

    def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)= {
      assert(nameParts.nonEmpty)
      if (nameParts.length == 1) { // nameParts.length等于1,代表只有表名,没有库名
        Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
      } else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { //nameParts.head为库名,判断库名是否等于globalTempDB
        // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
        // API does not support view yet, and we have to use v1 commands to deal with global temp
        // views. To simplify the implementation, we put global temp views in a special namespace
        // in the session catalog. The special namespace has higher priority during name resolution.
        // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
        // this custom catalog can't be accessed.
        Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
      } else {
        try {
          // 否则,通过catalogManager.catalog(nameParts.head)获取catalog
          Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
        } catch {
          case _: CatalogNotFoundException =>
            Some((currentCatalog, nameParts.asIdentifier))
        }
      }
    }
  } 

    def currentCatalog: CatalogPlugin = catalogManager.currentCatalog 

最终会调用到CatalogAndIdentifier.unapply
,因为我们建表语句中没有加库名限定,所以这里的nameParts
Seq(tableName)
,也就是length等于,所以返回Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
,值为:Some((HoodieCatalog, Identifier.of(default., tableName)),具体为啥为HoodieCatalog
,原因和我们在开头提到的一个配置有关

   def sparkConf(): SparkConf = {
    val sparkConf = new SparkConf()
    if (HoodieSparkUtils.gteqSpark3_2) {
      sparkConf.set("spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    }
    sparkConf
  }   

再看CatalogV2Util.isSessionCatalog

  def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
    catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
  }

private[sql] object CatalogManager {
  val SESSION_CATALOG_NAME: String = "spark_catalog"
}

这里的HoodieCatalog.name()
是在V2SessionCatalog
实现的

class V2SessionCatalog(catalogSessionCatalog)
  extends TableCatalog with SupportsNamespaces with SQLConfHelper 
{
  import V2SessionCatalog._

  override val defaultNamespace: Array[String] = Array("default")

  override def name: String = CatalogManager.SESSION_CATALOG_NAME

所以CatalogV2Util.isSessionCatalog(catalog)
为ture,NonSessionCatalogAndTable
正好相反,!CatalogV2Util.isSessionCatalog(catalog)
返回 false

 object NonSessionCatalogAndTable {
    def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])= nameParts match {
      case NonSessionCatalogAndIdentifier(catalog, ident) =>
        Some(catalog -> ident.asMultipartIdentifier)
      case _ => None
    }
  }

    object NonSessionCatalogAndIdentifier {
    def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)= parts match {
      case CatalogAndIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
        Some(catalog, ident)
      case _ => None
    }
  }

所以最终在ResolveSessionCatalog
中匹配成功

case c @ CreateTableAsSelectStatement(
         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
      val (storageFormat, provider) = getStorageFormatAndProvider(
        c.provider, c.options, c.location, c.serde, ctas = true)
      if (!isV2Provider(provider)) {
        val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
          c.partitioning, c.bucketSpec, c.properties, provider, c.location,
          c.comment, storageFormat, c.external)
        val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
        CreateTable(tableDesc, mode, Some(c.asSelect))
      } else 
{
        CreateTableAsSelect(
          catalog.asTableCatalog,
          tbl.asIdentifier,
          // convert the bucket spec and add it as a transform
          c.partitioning ++ c.bucketSpec.map(_.asTransform),
          c.asSelect,
          convertTableProperties(c),
          writeOptions = c.writeOptions,
          ignoreIfExists = c.ifNotExists)
      }

这一块的逻辑是判断是否为V2Provider
,如果不是的话返回CreateTable
,是的话返回CreateTableAsSelect
,关于判断是否为V2Provider
的逻辑比较多,这里先不讲,我们放在后面讲,我们这个版本的代码,是V2Provider
,所以返回CreateTableAsSelect
,这就是和Spark2不同的关键点,如果不是V2Provider
,那么和Saprk2一样返回CreateTable
。我们会在下面讲到:CreateTableAsSelect
最终调用HoodieCatalog
创建Hudi表,而CreateTable
我们在上面讲Spark2时已知最终调用CreateHoodieTableAsSelectCommand

case class CreateTableAsSelect(
    catalog: TableCatalog,
    tableName: Identifier,
    partitioning: Seq[Transform],
    query: LogicalPlan,
    properties: Map[String, String],
    writeOptions: Map[String, String],
    ignoreIfExists: Boolean)
 extends UnaryCommand with V2CreateTablePlan 
{

trait UnaryCommand extends Command with UnaryLike[LogicalPlan]      

那么CreateTableAsSelect
又是在哪里被转化,最后调用Hudi的逻辑的呢?

planning

  @transient private[sql] val logicalPlan: LogicalPlan = {
    val plan = queryExecution.commandExecuted
    if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)
{
      val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
      dsIds.add(id)
      plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
    }
    plan
  }

  lazy val commandExecuted: LogicalPlan = mode match {
    case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
    case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
    case CommandExecutionMode.SKIP => analyzed
  }

  private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
    case c: Command =>
      val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
      val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {
        qe.executedPlan.executeCollect()
      }
      CommandResult(
        qe.analyzed.output,
        qe.commandExecuted,
        qe.executedPlan,
        result)
    case other => other
  }

CreateTableAsSelect
也为Command
类型,跟上一篇文章讲的Spark2一样,Spark3也在new DataSet
时通过变量logicalPlan
,触发后面的planning
阶段

DataSourceV2Strategy

SparkPlanner
strategies
里有一个DataSourceV2Strategy

class SparkPlanner(val sessionSparkSessionval experimentalMethodsExperimentalMethods)
  extends SparkStrategies with SQLConfHelper 
{

  def numPartitions: Int = conf.numShufflePartitions

  override def strategies: Seq[Strategy] =
    experimentalMethods.extraStrategies ++
      extraPlanningStrategies ++ (
      LogicalQueryStageStrategy ::
      PythonEvals ::
      new DataSourceV2Strategy(session) ::
      FileSourceStrategy ::
      DataSourceStrategy ::
      SpecialLimits ::
      Aggregation ::
      Window ::
      JoinSelection ::
      InMemoryScans ::
      SparkScripts ::
      BasicOperators :: Nil)

看一下它的apply方法

  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(project, filters,
    ......
    case WriteToDataSourceV2(relationOpt, writer, query, customMetrics)
 
=>
      val invalidateCacheFunc: () => Unit = () => relationOpt match {
        case Some(r) => session.sharedState.cacheManager.uncacheQuery(session, r, cascade = true)
        case None => ()
      }
      WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil

    case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) 
=>
      val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
      CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil

    case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) 
=>
      val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
      val writeOptions = new CaseInsensitiveStringMap(options.asJava)
      catalog match {
        case staging: StagingTableCatalog =>
          AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
            propsWithOwner, writeOptions, ifNotExists) :: Nil
        case _ =>
          CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
            propsWithOwner, writeOptions, ifNotExists) :: Nil
      }
    ......

这里会匹配到CreateTableAsSelect
,然后根据catalog
的类型选择返回AtomicCreateTableAsSelectExec
或者CreateTableAsSelectExec
,那么需要看一下catalog
是否为StagingTableCatalog

HoodieCatalog

class HoodieCatalog extends DelegatingCatalogExtension
  with StagingTableCatalog
  with SparkAdapterSupport
  with ProvidesHoodieConfig 
{

  val spark: SparkSession = SparkSession.active

  override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable 
= {
    if (sparkAdapter.isHoodieTable(properties)) {
      HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
    } else {
      BasicStagedTable(
        ident,
        super.createTable(ident, schema, partitions, properties),
        this)
    }
  }

  override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
    if (sparkAdapter.isHoodieTable(properties)) {
      HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
    } else {
      super.dropTable(ident)
      BasicStagedTable(
        ident,
        super.createTable(ident, schema, partitions, properties),
        this)
    }
  }

  override def stageCreateOrReplace(ident: Identifier,
                                    schema: StructType,
                                    partitions: Array[Transform],
                                    properties: util.Map[String, String])
: StagedTable 
= {
    if (sparkAdapter.isHoodieTable(properties)) {
      HoodieStagedTable(
        ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
    } else {
      try super.dropTable(ident) catch {
        case _: NoSuchTableException => // ignore the exception
      }
      BasicStagedTable(
        ident,
        super.createTable(ident, schema, partitions, properties),
        this)
    }
  }

根据HoodieCatalog
的定义我们知道它实现了StagingTableCatalog
,所以返回AtomicCreateTableAsSelectExec

AtomicCreateTableAsSelectExec

case class AtomicCreateTableAsSelectExec(
    catalog: StagingTableCatalog,
    ident: Identifier,
    partitioning: Seq[Transform],
    plan: LogicalPlan,
    query: SparkPlan,
    properties: Map[String, String],
    writeOptions: CaseInsensitiveStringMap,
    ifNotExists: Boolean)
 extends TableWriteExecHelper 
{

  override protected def run(): Seq[InternalRow] = {
    if (catalog.tableExists(ident)) {
      if (ifNotExists) {
        return Nil
      }

      throw QueryCompilationErrors.tableAlreadyExistsError(ident)
    }
    val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
    val stagedTable = catalog.stageCreate(
      ident, schema, partitioning.toArray, properties.asJava)
    writeToTable(catalog, stagedTable, writeOptions, ident)
  }

可以看到AtomicCreateTableAsSelectExec
有一个run
方法,那么它的run
方法是哪里触发的呢?

V2CommandExec

其实AtomicCreateTableAsSelectExec
V2CommandExec
的子类

case class AtomicCreateTableAsSelectExec(
    catalog: StagingTableCatalog,
    ident: Identifier,
    partitioning: Seq[Transform],
    plan: LogicalPlan,
    query: SparkPlan,
    properties: Map[String, String],
    writeOptions: CaseInsensitiveStringMap,
    ifNotExists: Boolean)
 extends TableWriteExecHelper 
{

private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {

trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {

AtomicCreateTableAsSelectExec
executeCollect
是在V2CommandExec
实现的

abstract class V2CommandExec extends SparkPlan {

  /**
   * Abstract method that each concrete command needs to implement to compute the result.
   */

  protected def run(): Seq[InternalRow]

  /**
   * The value of this field can be used as the contents of the corresponding RDD generated from
   * the physical plan of this command.
   */

  private lazy val result: Seq[InternalRow] 
= run()

  /**
   * The `execute()` method of all the physical command classes should reference `result`
   * so that the command can be executed eagerly right after the command query is created.
   */

  override def executeCollect(): Array[InternalRow] = result.toArray

在上面eagerlyExecuteCommands
中会调用executeCollect
方法,继而调用AtomicCreateTableAsSelectExec
run
方法,那么这个run
方法是实现hudi逻辑的地方吗?

  override protected def run(): Seq[InternalRow] = {
    if (catalog.tableExists(ident)) {
      if (ifNotExists) {
        return Nil
      }

      throw QueryCompilationErrors.tableAlreadyExistsError(ident)
    }
    val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
    val stagedTable = catalog.stageCreate(
      ident, schema, partitioning.toArray, properties.asJava)
    writeToTable(catalog, stagedTable, writeOptions, ident)
  }

我们看到有两个地方和HoodieCatalog
有关,一个是catalog.stageCreate
,这里判断是否为Hudi表,是的话返回HoodieStagedTable
,所以这里的stagedTable
HoodieStagedTable

  override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
    if (sparkAdapter.isHoodieTable(properties)) {
      HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
    } else {
      BasicStagedTable(
        ident,
        super.createTable(ident, schema, partitions, properties),
        this)
    }
  }

另一个是writeToTable
,它是在父接口TableWriteExecHelper
实现的

private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {
  protected def writeToTable(
      catalog: TableCatalog,
      table: Table,
      writeOptions: CaseInsensitiveStringMap,
      ident: Identifier)
: Seq[InternalRow] 
= {
    Utils.tryWithSafeFinallyAndFailureCallbacks({
      table match {
        case table: SupportsWrite =>
          val info = LogicalWriteInfoImpl(
            queryId = UUID.randomUUID().toString,
            query.schema,
            writeOptions)
          val writeBuilder = table.newWriteBuilder(info)

          val write = writeBuilder.build()
          val writtenRows = write match {
            case v1: V1Write => writeWithV1(v1.toInsertableRelation)
            case v2 => writeWithV2(v2.toBatch)
          }

          table match {
            case st: StagedTable => st.commitStagedChanges()
            case _ =>
          }
          writtenRows

        case _ =>
          // Table does not support writes - staged changes are also rolled back below if table
          // is staging.
          throw QueryExecutionErrors.unsupportedTableWritesError(ident)
      }
    })(catchBlock = {
      table match {
        // Failure rolls back the staged writes and metadata changes.
        case st: StagedTable => st.abortStagedChanges()
        case _ => catalog.dropTable(ident)
      }
    })
  }
}

无论是HoodieStagedTable
还是BasicStagedTable
,都既实现了SupportsWrite
,也实现了StagedTable
,所以会先匹配上SupportsWrite
,然后再匹配上StagedTable
,最终调用commitStagedChanges
,其他的细节比如writeBuilder
writeWithV1
本文先不研究

case class HoodieStagedTable(ident: Identifier,
                             catalog: HoodieCatalog,
                             override val schema: StructType,
                             partitions: Array[Transform],
                             override val properties: util.Map[String, String],
                             mode: TableCreationMode)
 extends StagedTable with SupportsWrite 
{

  private var sourceQuery: Option[DataFrame] = None
  private var writeOptions: Map[String, String] = Map.empty

  override def commitStagedChanges(): Unit 
= {
    val props = new util.HashMap[String, String]()
    val optionsThroughProperties = properties.asScala.collect {
      case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
    }.toSet
    val sqlWriteOptions = new util.HashMap[String, String]()
    properties.asScala.foreach { case (k, v) =>
      if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
        props.put(k, v)
      } else if (optionsThroughProperties.contains(k)) {
        sqlWriteOptions.put(k, v)
      }
    }
    if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
      writeOptions = sqlWriteOptions.asScala.toMap
    }
    props.putAll(properties)
    props.put("hoodie.table.name", ident.name())
    props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
    catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
  }

  override def name(): String = ident.name()

  override def abortStagedChanges(): Unit = {
    clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
  }

  private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
    val path = new Path(tablePath)
    val fs = path.getFileSystem(conf)
    fs.delete(path, true)
  }

  override def capabilities(): util.Set[TableCapability] = Set(TableCapability.V1_BATCH_WRITE).asJava

  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder 
= {
    writeOptions = info.options.asCaseSensitiveMap().asScala.toMap
    new HoodieV1WriteBuilder
  }

  /*
   * WriteBuilder for creating a Hoodie table.
   */

  private class HoodieV1WriteBuilder extends WriteBuilder {
    override def build(): V1Write new V1Write {
      override def toInsertableRelation(): InsertableRelation = {
        new InsertableRelation {
          override def insert(data: DataFrame, overwrite: Boolean): Unit = {
            sourceQuery = Option(data)
          }
        }
      }
    }
  }

}

commitStagedChanges
方法中会调用catalog.createHoodieTable
创建Hudi表,实现Hudi自己的逻辑,这样就是为啥在我开头提到的PR中为啥修改CTAS
的代码会涉及HoodieCatalog
类了

  def createHoodieTable(ident: Identifier,
                        schema: StructType,
                        partitions: Array[Transform],
                        allTableProperties: util.Map[String, String],
                        writeOptions: Map[String, String],
                        sourceQuery: Option[DataFrame],
                        operation: TableCreationMode)
: Table 
= {

    val (partitionColumns, maybeBucketSpec) = convertTransforms(partitions)
    val newSchema = schema
    val newPartitionColumns = partitionColumns
    val newBucketSpec = maybeBucketSpec

    val isByPath = isPathIdentifier(ident)

    val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
    val id = ident.asTableIdentifier

    val locUriOpt = location.map(CatalogUtils.stringToURI)
    val existingTableOpt = getExistingTableIfExists(id)
    val loc = locUriOpt
      .orElse(existingTableOpt.flatMap(_.storage.locationUri))
      .getOrElse(spark.sessionState.catalog.defaultTablePath(id))
    val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
      .copy(locationUri = Option(loc))
    val tableType =
      if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
    val commentOpt = Option(allTableProperties.get("comment"))

    val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)

isV2Provider

来看一下isV2Provider
方法

  private def isV2Provider(provider: String): Boolean = {
    // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
    // `HiveFileFormat`, when running tests in sql/core.
    if (DDLUtils.isHiveTable(Some(provider))) return false
    DataSource.lookupDataSourceV2(provider, conf) match {
      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
      case Some(_: FileDataSourceV2) => false
      case Some(_) 
=> true
      case _ => false
    }
  }

  def isHiveTable(provider: Option[String]): Boolean = {
    provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
  }
  val HIVE_PROVIDER = "hive"  

这里的参数provider
为hudi,所以isHiveTable
是false,接下来看一下lookupDataSourceV2

  /**
   * Returns an optional [[TableProvider]] instance for the given provider. It returns None if
   * there is no corresponding Data Source V2 implementation, or the provider is configured to
   * fallback to Data Source V1 code path.
   */

  def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {
    val useV1Sources = conf.getConf(SQLConf.USE_V1_SOURCE_LIST).toLowerCase(Locale.ROOT)
      .split(",").map(_.trim)
    val cls = lookupDataSource(provider, conf)
    cls.newInstance() match {
      case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
      case t: TableProvider
          if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
        Some(t)
      case _ => None
    }
  }

    val USE_V1_SOURCE_LIST = buildConf("spark.sql.sources.useV1SourceList")
    .internal()
    .doc("A comma-separated list of data source short names or fully qualified data source " +
      "implementation class names for which Data Source V2 code path is disabled. These data " +
      "sources will fallback to Data Source V1 code path.")
    .version("3.0.0")
    .stringConf
    .createWithDefault("avro,csv,json,kafka,orc,parquet,text")

这里的useV1Sources
默认为"avro,csv,json,kafka,orc,parquet,text",继续看lookupDataSource
,其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource
org.apache.hudi.Spark3DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
等,然后过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource

  /** Given a provider name, look up the data source class definition. */
  def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    // provider1 = hudi
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcDataSourceV2].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    // provider2 = hudi.DefaultSource
    val provider2 = s"$provider1.DefaultSource"
    val loader = Utils.getContextOrSparkClassLoader
    // 这里是去加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,然后返回里面的内容
    // 其中有`org.apache.hudi.DefaultSource` `org.apache.hudi.Spark3DefaultSource` `org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat` 等
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

    try {
      // 过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource
      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
        // the provider format did not match any given registered aliases
        case Nil =>
          try {
            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
              case Success(dataSource) =>
                // Found the data source using fully qualified path
                dataSource
              case Failure(error) 
=>
                if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
                  throw QueryCompilationErrors.orcNotUsedWithHiveEnabledError()
                } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
                  provider1 == "com.databricks.spark.avro" ||
                  provider1 == "org.apache.spark.sql.avro") {
                  throw QueryCompilationErrors.failedToFindAvroDataSourceError(provider1)
                } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
                  throw QueryCompilationErrors.failedToFindKafkaDataSourceError(provider1)
                } else {
                  throw QueryExecutionErrors.failedToFindDataSourceError(provider1, error)
                }
            }
          } catch {
            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
              // NoClassDefFoundError's class name uses "/" rather than "." for packages
              val className = e.getMessage.replaceAll("/"".")
              if (spark2RemovedClasses.contains(className)) {
                throw QueryExecutionErrors.removedClassInSpark2Error(className, e)
              } else {
                throw e
              }
          }
        case head :: Nil =>
          // there is exactly one registered alias
          head.getClass
        case sources =>
          // There are multiple registered aliases for the input. If there is single datasource
          // that has "org.apache.spark" package in the prefix, we use it considering it is an
          // internal datasource within Spark.
          val sourceNames = sources.map(_.getClass.getName)
          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
          if (internalSources.size == 1) {
            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString("")}), " +
              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
            internalSources.head.getClass
          } else {
            throw QueryCompilationErrors.findMultipleDataSourceError(provider1, sourceNames)
          }
      }
    } catch {
      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
        // NoClassDefFoundError's class name uses "/" rather than "." for packages
        val className = e.getCause.getMessage.replaceAll("/"".")
        if (spark2RemovedClasses.contains(className)) {
          throw QueryExecutionErrors.incompatibleDataSourceRegisterError(e)
        } else {
          throw e
        }
    }
  }

看一下Spark3DefaultSource

class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider {

  override def shortName(): String "hudi"

  def inferSchema: StructType = new StructType()

  override def inferSchema(options: CaseInsensitiveStringMap): StructType inferSchema

  override def getTable(schema: StructType,
                        partitioning: Array[Transform],
                        properties: java.util.Map[String, String])
: Table 
= {
    val options = new CaseInsensitiveStringMap(properties)
    val path = options.get("path")
    if (path == nullthrow new HoodieException("'path' cannot be null, missing 'path' from table properties")

    HoodieInternalV2Table(SparkSession.active, path)
  }
}

可以看到shortName
等于hudi,而其他的 DefaultSource:hudi_v1,HoodieParquetFileFormat:Hoodie-Parquet,另外Spark2对应的为Spark2DefaultSource

class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
  override def shortName(): String "hudi"
}

所以cls
Spark3DefaultSource

    cls.newInstance() match {
      case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
      case t: TableProvider
          if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
        Some(t)
      case _ => None
    }

Spark3DefaultSource
既是DataSourceRegister
的子类又是TableProvider
的子类,所以会先匹配DataSourceRegister
,但是由于useV1Sources
不包含hudi
,所以继续匹配TableProvider
,useV1Sources
不包含Spark3DefaultSource
,所以返回Some(Spark3DefaultSource)

    DataSource.lookupDataSourceV2(provider, conf) match {
      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
      case Some(_: FileDataSourceV2) => false
      case Some(_) 
=> true
      case _ => false
    }

Spark3DefaultSource
不是FileDataSourceV2
的子类,所以会匹配到case Some(_) => true
返回true,所以hudi是V2Provider

最新代码

通过上面讲的我们知道了为啥hudi是V2Provider
,所以最终CTAS会调用HoodieCatalog.createHoodieTable
,而不是调用CreateHoodieTableAsSelectCommand
,这就是和Spark2不同的关键点,不过最新代码又改成和Spark2一样调用CreateHoodieTableAsSelectCommand
,不走HoodieCatalog
的逻辑了,原因是因为把Spark3DefaultSource
注释掉了一部分

class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {

  override def shortName(): String "hudi"

  /*
  def inferSchema: StructType = new StructType()

  override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema

  override def getTable(schema: StructType,
                        partitioning: Array[Transform],
                        properties: java.util.Map[String, String]): Table = {
    val options = new CaseInsensitiveStringMap(properties)
    val path = options.get("path")
    if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")

    HoodieInternalV2Table(SparkSession.active, path)
  }
  */

}

我们看到Spark3DefaultSource
不是TableProvider
的子类了,所以最终isV2Provider
返回false,后面的逻辑就和Spark2的一样了。

相关PR:https://github.com/apache/hudi/pull/5737

我们可以打印一下它的执行计划

== Parsed Logical Plan ==
'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
   +- OneRowRelation

== Analyzed Logical Plan ==
CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
   +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
      +- OneRowRelation

== Optimized Logical Plan ==
CommandResult Execute CreateHoodieTableAsSelectCommand
   +- CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
         +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
            +- OneRowRelation

== Physical Plan ==
CommandResult <empty>
   +- Execute CreateHoodieTableAsSelectCommand
         +- CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
               +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
                  +- OneRowRelation

可以看到确实和Spark2一样也是通过CreateHoodieTableAsSelectCommand
来实现Hudi逻辑的


文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论