前言
上一篇文章Hudi Spark SQL源码学习总结-Create Table总结了Create Table
的源码执行逻辑,这一篇继续总结CTAS
,之所以总结CTAS
,是之前在我提交的一个PR中发现,Spark2和Spark3.2.1版本的CTAS
的逻辑不一样,最终走的Hudi实现类也不一样,所以本文分Spark2和Spark3.2.1两个版本分析
不同点
先总结一下Spark2和Spark3.2.1的整体逻辑的不同点
Spark2: visitCreateTable
->CreateTable
->CreateHoodieTableAsSelectCommand.run
Spark3.2.1: 前提配置了:spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
,如果没有配置则和Spark2一样
visitCreateTable
->CreateTableAsSelectStatement
->isV2Provider
->true->CreateTableAsSelect
->HoodieCatalog.createHoodieTable
visitCreateTable
->CreateTableAsSelectStatement
->isV2Provider
->false->CreateTable
->CreateHoodieTableAsSelectCommand.run
Spark2和Spark3.2.1不同的关键点有两个:
1、配置
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog2、isV2Provider("hudi")返回ture
只要有一个不满足,Spark3.2.1的逻辑就和Spark2一样,引进HoodieCatalog
和令hudi
为V2Provider的PR为:https://github.com/apache/hudi/pull/4611
目前master最新代码已将spark3.2.1的isV2Provider("hudi")改为了false,也就是Spark2和Saprk3.2.1的逻辑又一致了,PR:https://github.com/apache/hudi/pull/5737
版本
Hudi https://github.com/apache/hudi/pull/5592 本文基于这个PR对应的代码进行调试分析,因为我就是在贡献这个PR时才发现Spark3.2.1和Saprk2的CTAS
的逻辑不同的
示例代码
还是直接拿源码里的TestCreateTable
的测试语句
spark.sql(
s"""
| create table $tableName using hudi
| partitioned by (dt)
| tblproperties(
| hoodie.database.name = "databaseName",
| hoodie.table.name = "tableName",
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.datasource.write.operation = 'upsert',
| type = '$tableType'
| )
| AS
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts
""".stripMargin
)
不过需要提一下, 这里的spark是如何创建的,因为在分析Spark3.2.1的逻辑时会用到,先贴在这里:
protected lazy val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("hoodie sql test")
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("hoodie.insert.shuffle.parallelism", "4")
.config("hoodie.upsert.shuffle.parallelism", "4")
.config("hoodie.delete.shuffle.parallelism", "4")
.config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
.config("spark.sql.session.timeZone", "CTT")
.config(sparkConf())
.getOrCreate()
def sparkConf(): SparkConf = {
val sparkConf = new SparkConf()
if (HoodieSparkUtils.gteqSpark3_2) {
sparkConf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
}
sparkConf
}
打印执行计划
和上篇文章一样我们先打印一下计划,方便我们分析
config("spark.sql.planChangeLog.level", "INFO")
val df = spark.sql(ctasSql)
df.explain(true)
和上一篇文章的不同点是加了配置"spark.sql.planChangeLog.level", "INFO"
,之所以上篇文章不加这篇文章加,是因为这个配置在Spark3.1.0才有得,所以对于Spark2的代码不生效,不过在我们分析Spark3.2.1的执行计划会比较有用,另外提一下,开启这个配置是通过logBasedOnLevel(message)
来打印信息的,一共有三个方法调用了logBasedOnLevel
,分别为logRule
:如果rule生效,打印oldPlan
和newPlan
,logBatch
:打印Batch
的前后信息,logMetrics
:打印整体指标,但是在planner.plan
中没有调用这几个方法,所以对于分析哪些strategies
会生效是没用的,不过对于分析analysis
阶段的哪些规则会生效还是非常有用的
private def logBasedOnLevel(f: => String): Unit = {
logLevel match {
case "TRACE" => logTrace(f)
case "DEBUG" => logDebug(f)
case "INFO" => logInfo(f)
case "WARN" => logWarning(f)
case "ERROR" => logError(f)
case _ => logTrace(f)
}
}
def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (!newPlan.fastEquals(oldPlan)) {
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
def message(): String = {
s"""
|=== Applying Rule $ruleName ===
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
""".stripMargin
}
logBasedOnLevel(message)
}
}
}
def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
def message(): String = {
if (!oldPlan.fastEquals(newPlan)) {
s"""
|=== Result of Batch $batchName ===
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
""".stripMargin
} else {
s"Batch $batchName has no effect."
}
}
logBasedOnLevel(message)
}
}
def logMetrics(metrics: QueryExecutionMetrics): Unit = {
val totalTime = metrics.time / NANOS_PER_SECOND.toDouble
val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble
val message =
s"""
|=== Metrics of Executed Rules ===
|Total number of runs: ${metrics.numRuns}
|Total time: $totalTime seconds
|Total number of effective runs: ${metrics.numEffectiveRuns}
|Total time of effective runs: $totalTimeEffective seconds
""".stripMargin
logBasedOnLevel(message)
}
Spark2
Spark2的逻辑和上一篇文章差不多,由于上一篇已经总结过了,所以本文只讲不同的地方,如果掌握了上一篇文章的逻辑的话,再看CTAS
的逻辑还是比较简单的。
打印信息
== Parsed Logical Plan ==
'CreateTable `h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Analyzed Logical Plan ==
CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Optimized Logical Plan ==
CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Physical Plan ==
Execute CreateHoodieTableAsSelectCommand
+- CreateHoodieTableAsSelectCommand `h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
singleStatement
根据上篇文章中的逻辑,可知这里的CTAS
语句同样对应Spark源码里的SqlBase.g4
singleStatement
: statement EOF
;
statement
: query #statementDefault
| USE db=identifier #use
| CREATE DATABASE (IF NOT EXISTS)? identifier
(COMMENT comment=STRING)? locationSpec?
(WITH DBPROPERTIES tablePropertyList)? #createDatabase
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitionColumnNames=identifierList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
((COMMENT comment=STRING) |
(PARTITIONED BY '(' partitionColumns=colTypeList ')') |
bucketSpec |
skewSpec |
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createHiveTable
......
tableProvider
: USING qualifiedName
;
不过这里有点不同的是:query不为空 (AS? query) ,所以在visitCreateTable
中返回CreateTable(tableDesc, mode, Some(query))
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
// provider为hudi
val provider = ctx.tableProvider.qualifiedName.getText
val schema = Option(ctx.colTypeList()).map(createSchema)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val storage = DataSource.buildStorageFormatFromOptions(options)
if (location.isDefined && storage.locationUri.isDefined) {
throw new ParseException(
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
"you can only specify one of them.", ctx)
}
val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI))
val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val tableDesc = CatalogTable(
identifier = table,
tableType = tableType,
storage = storage.copy(locationUri = customLocation),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
partitionColumnNames = partitionColumnNames,
bucketSpec = bucketSpec,
properties = properties,
comment = Option(ctx.comment).map(string))
// Determine the storage mode.
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
if (ctx.query != null) {
// Get the backing query.
val query = plan(ctx.query)
if (temp) {
operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
}
// Don't allow explicit specification of schema for CTAS
if (schema.nonEmpty) {
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement",
ctx)
}
CreateTable(tableDesc, mode, Some(query))
} else {
if (temp) {
if (ifNotExists) {
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
}
logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
"CREATE TEMPORARY VIEW ... USING ... instead")
// Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support
// IF NOT EXISTS. Users are not allowed to replace the existing temp table.
CreateTempViewUsing(table, schema, replace = false, global = false, provider, options)
} else {
CreateTable(tableDesc, mode, None)
}
}
}
analysis
那么在analysis
阶段中Hudi的自定义规则customResolutionRules
中的HoodieAnalysis
的apply方法中会被匹配到
case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
with SparkAdapterSupport {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Convert to MergeIntoHoodieTableCommand
case m @ MergeIntoTable(target, _, _, _, _)
if m.resolved && sparkAdapter.isHoodieTable(target, sparkSession) =>
MergeIntoHoodieTableCommand(m)
// Convert to UpdateHoodieTableCommand
case u @ UpdateTable(table, _, _)
if u.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
UpdateHoodieTableCommand(u)
// Convert to DeleteHoodieTableCommand
case d @ DeleteFromTable(table, _)
if d.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
DeleteHoodieTableCommand(d)
// Convert to InsertIntoHoodieTableCommand
case l if sparkAdapter.isInsertInto(l) =>
val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get
table match {
case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) =>
new InsertIntoHoodieTableCommand(relation, query, partition, overwrite)
case _ =>
l
}
// Convert to CreateHoodieTableAsSelectCommand
case CreateTable(table, mode, Some(query))
if query.resolved && sparkAdapter.isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
// Convert to CompactionHoodieTableCommand
case CompactionTable(table, operation, options)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionHoodieTableCommand(catalogTable, operation, options)
// Convert to CompactionHoodiePathCommand
case CompactionPath(path, operation, options) =>
CompactionHoodiePathCommand(path, operation, options)
// Convert to CompactionShowOnTable
case CompactionShowOnTable(table, limit)
if sparkAdapter.isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionShowHoodieTableCommand(catalogTable, limit)
// Convert to CompactionShowHoodiePathCommand
case CompactionShowOnPath(path, limit) =>
CompactionShowHoodiePathCommand(path, limit)
// Convert to HoodieCallProcedureCommand
case c@CallCommand(_, _) =>
val procedure: Option[Procedure] = loadProcedure(c.name)
val input = buildProcedureArgs(c.args)
if (procedure.nonEmpty) {
CallProcedureHoodieCommand(procedure.get, input)
} else {
c
}
case _ => plan
}
}
这里转化为CreateHoodieTableAsSelectCommand
,它和CreateHoodieTableCommand
一样是是Command
的子类
case class CreateHoodieTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan) extends HoodieLeafRunnableCommand {
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableName has been dropped when the save mode is Overwrite")
if (mode == SaveMode.ErrorIfExists) {
throw new RuntimeException(s"Table $tableName already exists. You need to drop it first.")
}
if (mode == SaveMode.Ignore) {
// Since the table already exists and the save mode is Ignore, we will just return.
// scalastyle:off
return Seq.empty
// scalastyle:on
}
}
......
所以会最终会调用ExecutedCommandExec.executeCollect
,触发CreateHoodieTableAsSelectCommand
重写的run方法,实现Hudi自己的逻辑,Hudi自己的逻辑可以在Hudi源码里调试跟踪,本文就不总结了。
Spark3.2.1
Hudi支持不同的Spark版本,默认是Spark2.4.4,要想使用Spark3.2.1版本,可以通过如下命令编译打包:
mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12
要想调试Spark3.2.1,可以根据上面的命令先打包或者install到本地,再新建一个测试项目引用我们自己打的包用来调试,也可以直接在Hudi源码里修改配置Idea Spark3.2.1的环境,不过比较麻烦,本人用的第二种方法,同样的这里也只讲关键的不同点
打印信息
Spark3.2.1的打印信息会比Spark2更全一点,我们可以看到最终的Physical Plan
与AtomicCreateTableAsSelect
和HoodieCatalog
有关
== Parsed Logical Plan ==
'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Analyzed Logical Plan ==
CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Optimized Logical Plan ==
CommandResult AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
+- CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Physical Plan ==
CommandResult <empty>
+- AtomicCreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@6dbbdf92, default.h0, [dt], Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, owner=dongkelun01, type=cow, preCombineField=ts], [], false
+- *(1) Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- *(1) Scan OneRowRelation[]
PlanChangeLogger的日志比较多,会打印哪些规则没有生效,哪些规则生效了,具体怎么生效的等,由于比较多,这里只贴一小部分
7720 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Substitution has no effect.
7721 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Disable Hints has no effect.
7724 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Hints has no effect.
7728 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Simple Sanity Check has no effect.
8309 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger -
=== Applying Rule org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog ===
!'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4] +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation +- OneRowRelation
8331 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger -
=== Result of Batch Resolution ===
!'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false CreateTableAsSelect org.apache.spark.sql.hudi.catalog.HoodieCatalog@c3719e5, default.h0, [dt], [provider=hudi, hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4] +- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation +- OneRowRelation
8334 [ScalaTest-run-running-TestCreateTable] INFO org.apache.spark.sql.catalyst.rules.PlanChangeLogger - Batch Remove TempResolvedColumn has no effect.
......
=== Metrics of Executed Rules ===
Total number of runs: 141
Total time: 0.6459626 seconds
Total number of effective runs: 2
Total time of effective runs: 0.302878 seconds
由于比较长,导致换行,oldPan和newPlan的对比效果不是很明显,不过可以大概看出来前后变化就行,也可以自己调试对比
ANTLR
上一篇讲到Hudi有三个g4
文件,一个在hudi-spark模块下,另外两个在hudi-spark2模块下,同样的在hudi-spark3模块下也有两个同名的g4
文件,不过内容和Spark2的不一样,具体为:
hudi-spark模块下的
HoodieSqlCommon.g4hudi-spark3模块下的
SqlBase.g4
,拷贝自的Spark3.2.0源码里的SqlBase.g4hudi-spark3模块下的
HoodieSqlBase.g4
其中导入了上面的SqlBase.g4
HoodieSqlBase.g4
:
grammar HoodieSqlBase;
import SqlBase;
singleStatement
: statement EOF
;
statement
: query #queryStatement
| ctes? dmlStatementNoWith #dmlStatement
| createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)? #createTable
| .*? #passThrough
;
parsing
同样的parsePlan
首先调用HoodieCommonSqlParser.parsePlan
,这个是公共的,和Spark2一样,返回null,调用sparkExtendedParser.parsePlan
private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate)
private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
.map(_(session, delegate)).getOrElse(delegate)
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _=> sparkExtendedParser.parsePlan(sqlText)
}
}
上一篇讲到Spark2的sparkExtendedParser
为HoodieSpark2ExtendedSqlParser
,那么Spark3的是啥呢?很简单和Spark2的逻辑一样,来看一下:
private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
.map(_(session, delegate)).getOrElse(delegate)
lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3_1Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"
}
getClass.getClassLoader.loadClass(adapterClass)
.newInstance().asInstanceOf[SparkAdapter]
}
根据上面的代码可知sparkAdapter
为Spark3_2Adapter
,接着看一下Spark3_2Adapter.createExtendedSparkParser
override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
Some(
(spark: SparkSession, delegate: ParserInterface) => new HoodieSpark3_2ExtendedSqlParser(spark, delegate)
)
}
所以这里的Spark3的sparkExtendedParser
为HoodieSpark3_2ExtendedSqlParser
,接着到了HoodieSpark3_2ExtendedSqlParser.parsePlan
,这里和Spark2的逻辑不一样
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _=> delegate.parsePlan(sqlText)
}
}
protected def parse[T](command: String)(toResult: HoodieSqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
val lexer = new HoodieSqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)
val parser = new HoodieSqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
// parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
parser.SQL_standard_keyword_behavior = conf.ansiEnabled
try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()
// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
可以看到这里parser
同样为HoodieSqlBaseParser
,但是对于builder.visit(parser.singleStatement())
,Spark3和Spark2是不一样的,为啥不一样?我们接着往下看:
我们在Spark3的HoodieSqlBase.g4
中可以看到statement
中是有#createTable
的
| createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)? #createTable
其中 createTableHeader
、tableProvider
、AS
、query
都是引自SqlBase.g4
,所以这里的CTAS
能匹配上,这里的parser.singleStatement()
和之前讲的一样,最终都会调用builder.visitCreateTable
,不同的是,这里的builder
为HoodieSpark3_2ExtendedSqlAstBuilder
,所以需要看一下它的visitCreateTable
有何不同
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
}
if (temp) {
val asSelect = if (ctx.query == null) "" else " AS ..."
operationNotAllowed(
s"CREATE TEMPORARY TABLE ...$asSelect, use CREATE TEMPORARY VIEW instead", ctx)
}
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement",
ctx)
case Some(_) if partCols.nonEmpty =>
// non-reference partition columns are not allowed because schema can't be specified
operationNotAllowed(
"Partition column types may not be specified in Create Table As Select (CTAS)",
ctx)
case Some(query) =>
CreateTableAsSelectStatement(
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
}
}
这里会匹配到case Some(query)
,返回CreateTableAsSelectStatement
,这就是和Spark2(或者说Spark源码里的visitCreateTable)不同之处,Spark2返回的是CreateTable(tableDesc, mode, Some(query))
,那么又是在哪里对CreateTableAsSelectStatement
进行处理的呢
ResolveCatalogs 和 ResolveSessionCatalog
有两个规则类会匹配CreateTableAsSelectStatement
,ResolveCatalogs
是在Analyzer
的batches
中定义的,ResolveSessionCatalog
是在BaseSessionStateBuilder.analyzer
重写的的extendedResolutionRules
中定义的(我们在PlanChangeLogger的日志中可知,真正起作用的是ResolveSessionCatalog
)
override def batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
// at the beginning of analysis.
OptimizeUpdateFields,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
SubstituteUnresolvedOrdinals),
Batch("Disable Hints", Once,
new ResolveHints.DisableHints),
Batch("Hints", fixedPoint,
ResolveHints.ResolveJoinStrategyHints,
ResolveHints.ResolveCoalesceHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions(v1SessionCatalog) ::
ResolveNamespace(catalogManager) ::
new ResolveCatalogs(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
ResolveRelations ::
......
extendedResolutionRules : _*),
.......
protected def analyzer: Analyzer = new Analyzer(catalogManager) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
ResolveEncodersInScalaAgg +:
new ResolveSessionCatalog(catalogManager) +:
ResolveWriteToStream +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion +:
DataSourceAnalysis +:
customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
PreReadCheck +:
HiveOnlyCheck +:
TableCapabilityCheck +:
CommandCheck +:
customCheckRules
}
他们两个的apply方法分别为:
class ResolveCatalogs(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
ignoreIfExists = c.ifNotExists)
case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)
class ResolveSessionCatalog(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
cols.foreach { c =>
assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
if (!c.nullable) {
throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
}
}
AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
......
case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = true)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
c.comment, storageFormat, c.external)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
} else {
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)
}
......
可以看到这两个规则都试图去匹配CreateTableAsSelectStatement
,区别是一个匹配NonSessionCatalogAndTable
,另一个匹配SessionCatalogAndTable
,根据名字判断,他们两个的判断是反过来的,总有一个会匹配上,那么这俩具体实现是啥呢,我看以SessionCatalogAndTable
为例,我们知道scala在匹配样例类对象时回去调用它的unapply
方法,这里的参数nameParts
为CreateTableAsSelectStatement
的第一个参数,在上面的visitCreateTable
可知,它是由visitCreateTableHeader(ctx.createTableHeader)
返回的table
object SessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case SessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
case _ => None
}
}
object SessionCatalogAndIdentifier {
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
case CatalogAndIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
Some(catalog, ident)
case _ => None
}
}
object CatalogAndIdentifier {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
if (nameParts.length == 1) { // nameParts.length等于1,代表只有表名,没有库名
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) { //nameParts.head为库名,判断库名是否等于globalTempDB
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
} else {
try {
// 否则,通过catalogManager.catalog(nameParts.head)获取catalog
Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
} catch {
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts.asIdentifier))
}
}
}
}
def currentCatalog: CatalogPlugin = catalogManager.currentCatalog
最终会调用到CatalogAndIdentifier.unapply
,因为我们建表语句中没有加库名限定,所以这里的nameParts
为Seq(tableName)
,也就是length等于,所以返回Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
,值为:Some((HoodieCatalog, Identifier.of(default., tableName)),具体为啥为HoodieCatalog
,原因和我们在开头提到的一个配置有关
def sparkConf(): SparkConf = {
val sparkConf = new SparkConf()
if (HoodieSparkUtils.gteqSpark3_2) {
sparkConf.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
}
sparkConf
}
再看CatalogV2Util.isSessionCatalog
def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}
private[sql] object CatalogManager {
val SESSION_CATALOG_NAME: String = "spark_catalog"
}
这里的HoodieCatalog.name()
是在V2SessionCatalog
实现的
class V2SessionCatalog(catalog: SessionCatalog)
extends TableCatalog with SupportsNamespaces with SQLConfHelper {
import V2SessionCatalog._
override val defaultNamespace: Array[String] = Array("default")
override def name: String = CatalogManager.SESSION_CATALOG_NAME
所以CatalogV2Util.isSessionCatalog(catalog)
为ture,NonSessionCatalogAndTable
正好相反,!CatalogV2Util.isSessionCatalog(catalog)
返回 false
object NonSessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
case _ => None
}
}
object NonSessionCatalogAndIdentifier {
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
case CatalogAndIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
Some(catalog, ident)
case _ => None
}
}
所以最终在ResolveSessionCatalog
中匹配成功
case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = true)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
c.comment, storageFormat, c.external)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
} else {
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)
}
这一块的逻辑是判断是否为V2Provider
,如果不是的话返回CreateTable
,是的话返回CreateTableAsSelect
,关于判断是否为V2Provider
的逻辑比较多,这里先不讲,我们放在后面讲,我们这个版本的代码,是V2Provider
,所以返回CreateTableAsSelect
,这就是和Spark2不同的关键点,如果不是V2Provider
,那么和Saprk2一样返回CreateTable
。我们会在下面讲到:CreateTableAsSelect
最终调用HoodieCatalog
创建Hudi表,而CreateTable
我们在上面讲Spark2时已知最终调用CreateHoodieTableAsSelectCommand
case class CreateTableAsSelect(
catalog: TableCatalog,
tableName: Identifier,
partitioning: Seq[Transform],
query: LogicalPlan,
properties: Map[String, String],
writeOptions: Map[String, String],
ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
那么CreateTableAsSelect
又是在哪里被转化,最后调用Hudi的逻辑的呢?
planning
@transient private[sql] val logicalPlan: LogicalPlan = {
val plan = queryExecution.commandExecuted
if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
dsIds.add(id)
plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
}
plan
}
lazy val commandExecuted: LogicalPlan = mode match {
case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
case CommandExecutionMode.SKIP => analyzed
}
private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
case c: Command =>
val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {
qe.executedPlan.executeCollect()
}
CommandResult(
qe.analyzed.output,
qe.commandExecuted,
qe.executedPlan,
result)
case other => other
}
CreateTableAsSelect
也为Command
类型,跟上一篇文章讲的Spark2一样,Spark3也在new DataSet
时通过变量logicalPlan
,触发后面的planning
阶段
DataSourceV2Strategy
在SparkPlanner
的strategies
里有一个DataSourceV2Strategy
class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
extends SparkStrategies with SQLConfHelper {
def numPartitions: Int = conf.numShufflePartitions
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
new DataSourceV2Strategy(session) ::
FileSourceStrategy ::
DataSourceStrategy ::
SpecialLimits ::
Aggregation ::
Window ::
JoinSelection ::
InMemoryScans ::
SparkScripts ::
BasicOperators :: Nil)
看一下它的apply方法
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters,
......
case WriteToDataSourceV2(relationOpt, writer, query, customMetrics) =>
val invalidateCacheFunc: () => Unit = () => relationOpt match {
case Some(r) => session.sharedState.cacheManager.uncacheQuery(session, r, cascade = true)
case None => ()
}
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil
case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
}
......
这里会匹配到CreateTableAsSelect
,然后根据catalog
的类型选择返回AtomicCreateTableAsSelectExec
或者CreateTableAsSelectExec
,那么需要看一下catalog
是否为StagingTableCatalog
HoodieCatalog
class HoodieCatalog extends DelegatingCatalogExtension
with StagingTableCatalog
with SparkAdapterSupport
with ProvidesHoodieConfig {
val spark: SparkSession = SparkSession.active
override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
} else {
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
} else {
super.dropTable(ident)
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
override def stageCreateOrReplace(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(
ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
} else {
try super.dropTable(ident) catch {
case _: NoSuchTableException => // ignore the exception
}
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
根据HoodieCatalog
的定义我们知道它实现了StagingTableCatalog
,所以返回AtomicCreateTableAsSelectExec
AtomicCreateTableAsSelectExec
case class AtomicCreateTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
ifNotExists: Boolean) extends TableWriteExecHelper {
override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
if (ifNotExists) {
return Nil
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
val stagedTable = catalog.stageCreate(
ident, schema, partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident)
}
可以看到AtomicCreateTableAsSelectExec
有一个run
方法,那么它的run
方法是哪里触发的呢?
V2CommandExec
其实AtomicCreateTableAsSelectExec
是V2CommandExec
的子类
case class AtomicCreateTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
ifNotExists: Boolean) extends TableWriteExecHelper {
private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {
trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
AtomicCreateTableAsSelectExec
的executeCollect
是在V2CommandExec
实现的
abstract class V2CommandExec extends SparkPlan {
/**
* Abstract method that each concrete command needs to implement to compute the result.
*/
protected def run(): Seq[InternalRow]
/**
* The value of this field can be used as the contents of the corresponding RDD generated from
* the physical plan of this command.
*/
private lazy val result: Seq[InternalRow] = run()
/**
* The `execute()` method of all the physical command classes should reference `result`
* so that the command can be executed eagerly right after the command query is created.
*/
override def executeCollect(): Array[InternalRow] = result.toArray
在上面eagerlyExecuteCommands
中会调用executeCollect
方法,继而调用AtomicCreateTableAsSelectExec
的run
方法,那么这个run
方法是实现hudi逻辑的地方吗?
override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
if (ifNotExists) {
return Nil
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
val stagedTable = catalog.stageCreate(
ident, schema, partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident)
}
我们看到有两个地方和HoodieCatalog
有关,一个是catalog.stageCreate
,这里判断是否为Hudi表,是的话返回HoodieStagedTable
,所以这里的stagedTable
为HoodieStagedTable
override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
} else {
BasicStagedTable(
ident,
super.createTable(ident, schema, partitions, properties),
this)
}
}
另一个是writeToTable
,它是在父接口TableWriteExecHelper
实现的
private[v2] trait TableWriteExecHelper extends V2TableWriteExec with SupportsV1Write {
protected def writeToTable(
catalog: TableCatalog,
table: Table,
writeOptions: CaseInsensitiveStringMap,
ident: Identifier): Seq[InternalRow] = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
table match {
case table: SupportsWrite =>
val info = LogicalWriteInfoImpl(
queryId = UUID.randomUUID().toString,
query.schema,
writeOptions)
val writeBuilder = table.newWriteBuilder(info)
val write = writeBuilder.build()
val writtenRows = write match {
case v1: V1Write => writeWithV1(v1.toInsertableRelation)
case v2 => writeWithV2(v2.toBatch)
}
table match {
case st: StagedTable => st.commitStagedChanges()
case _ =>
}
writtenRows
case _ =>
// Table does not support writes - staged changes are also rolled back below if table
// is staging.
throw QueryExecutionErrors.unsupportedTableWritesError(ident)
}
})(catchBlock = {
table match {
// Failure rolls back the staged writes and metadata changes.
case st: StagedTable => st.abortStagedChanges()
case _ => catalog.dropTable(ident)
}
})
}
}
无论是HoodieStagedTable
还是BasicStagedTable
,都既实现了SupportsWrite
,也实现了StagedTable
,所以会先匹配上SupportsWrite
,然后再匹配上StagedTable
,最终调用commitStagedChanges
,其他的细节比如writeBuilder
和writeWithV1
本文先不研究
case class HoodieStagedTable(ident: Identifier,
catalog: HoodieCatalog,
override val schema: StructType,
partitions: Array[Transform],
override val properties: util.Map[String, String],
mode: TableCreationMode) extends StagedTable with SupportsWrite {
private var sourceQuery: Option[DataFrame] = None
private var writeOptions: Map[String, String] = Map.empty
override def commitStagedChanges(): Unit = {
val props = new util.HashMap[String, String]()
val optionsThroughProperties = properties.asScala.collect {
case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
}.toSet
val sqlWriteOptions = new util.HashMap[String, String]()
properties.asScala.foreach { case (k, v) =>
if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
props.put(k, v)
} else if (optionsThroughProperties.contains(k)) {
sqlWriteOptions.put(k, v)
}
}
if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
writeOptions = sqlWriteOptions.asScala.toMap
}
props.putAll(properties)
props.put("hoodie.table.name", ident.name())
props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
}
override def name(): String = ident.name()
override def abortStagedChanges(): Unit = {
clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
}
private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
val path = new Path(tablePath)
val fs = path.getFileSystem(conf)
fs.delete(path, true)
}
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.V1_BATCH_WRITE).asJava
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
writeOptions = info.options.asCaseSensitiveMap().asScala.toMap
new HoodieV1WriteBuilder
}
/*
* WriteBuilder for creating a Hoodie table.
*/
private class HoodieV1WriteBuilder extends WriteBuilder {
override def build(): V1Write = new V1Write {
override def toInsertableRelation(): InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
sourceQuery = Option(data)
}
}
}
}
}
}
commitStagedChanges
方法中会调用catalog.createHoodieTable
创建Hudi表,实现Hudi自己的逻辑,这样就是为啥在我开头提到的PR中为啥修改CTAS
的代码会涉及HoodieCatalog
类了
def createHoodieTable(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
allTableProperties: util.Map[String, String],
writeOptions: Map[String, String],
sourceQuery: Option[DataFrame],
operation: TableCreationMode): Table = {
val (partitionColumns, maybeBucketSpec) = convertTransforms(partitions)
val newSchema = schema
val newPartitionColumns = partitionColumns
val newBucketSpec = maybeBucketSpec
val isByPath = isPathIdentifier(ident)
val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
val id = ident.asTableIdentifier
val locUriOpt = location.map(CatalogUtils.stringToURI)
val existingTableOpt = getExistingTableIfExists(id)
val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
.copy(locationUri = Option(loc))
val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
val commentOpt = Option(allTableProperties.get("comment"))
val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
isV2Provider
来看一下isV2Provider
方法
private def isV2Provider(provider: String): Boolean = {
// Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return false
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(_: FileDataSourceV2) => false
case Some(_) => true
case _ => false
}
}
def isHiveTable(provider: Option[String]): Boolean = {
provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
}
val HIVE_PROVIDER = "hive"
这里的参数provider
为hudi,所以isHiveTable
是false,接下来看一下lookupDataSourceV2
/**
* Returns an optional [[TableProvider]] instance for the given provider. It returns None if
* there is no corresponding Data Source V2 implementation, or the provider is configured to
* fallback to Data Source V1 code path.
*/
def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {
val useV1Sources = conf.getConf(SQLConf.USE_V1_SOURCE_LIST).toLowerCase(Locale.ROOT)
.split(",").map(_.trim)
val cls = lookupDataSource(provider, conf)
cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
case t: TableProvider
if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
Some(t)
case _ => None
}
}
val USE_V1_SOURCE_LIST = buildConf("spark.sql.sources.useV1SourceList")
.internal()
.doc("A comma-separated list of data source short names or fully qualified data source " +
"implementation class names for which Data Source V2 code path is disabled. These data " +
"sources will fallback to Data Source V1 code path.")
.version("3.0.0")
.stringConf
.createWithDefault("avro,csv,json,kafka,orc,parquet,text")
这里的useV1Sources
默认为"avro,csv,json,kafka,orc,parquet,text",继续看lookupDataSource
,其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource
org.apache.hudi.Spark3DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
等,然后过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource
/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
// provider1 = hudi
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcDataSourceV2].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
// provider2 = hudi.DefaultSource
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
// 这里是去加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,然后返回里面的内容
// 其中有`org.apache.hudi.DefaultSource` `org.apache.hudi.Spark3DefaultSource` `org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat` 等
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try {
// 过滤shortName=hudi的,只有Spark3DefaultSource满足,所以直接返回Spark3DefaultSource
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
// the provider format did not match any given registered aliases
case Nil =>
try {
Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) =>
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
throw QueryCompilationErrors.orcNotUsedWithHiveEnabledError()
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw QueryCompilationErrors.failedToFindAvroDataSourceError(provider1)
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw QueryCompilationErrors.failedToFindKafkaDataSourceError(provider1)
} else {
throw QueryExecutionErrors.failedToFindDataSourceError(provider1, error)
}
}
} catch {
case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw QueryExecutionErrors.removedClassInSpark2Error(className, e)
} else {
throw e
}
}
case head :: Nil =>
// there is exactly one registered alias
head.getClass
case sources =>
// There are multiple registered aliases for the input. If there is single datasource
// that has "org.apache.spark" package in the prefix, we use it considering it is an
// internal datasource within Spark.
val sourceNames = sources.map(_.getClass.getName)
val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
if (internalSources.size == 1) {
logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw QueryCompilationErrors.findMultipleDataSourceError(provider1, sourceNames)
}
}
} catch {
case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getCause.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw QueryExecutionErrors.incompatibleDataSourceRegisterError(e)
} else {
throw e
}
}
}
看一下Spark3DefaultSource
class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider {
override def shortName(): String = "hudi"
def inferSchema: StructType = new StructType()
override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
override def getTable(schema: StructType,
partitioning: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val options = new CaseInsensitiveStringMap(properties)
val path = options.get("path")
if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")
HoodieInternalV2Table(SparkSession.active, path)
}
}
可以看到shortName
等于hudi,而其他的 DefaultSource:hudi_v1,HoodieParquetFileFormat:Hoodie-Parquet,另外Spark2对应的为Spark2DefaultSource
class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
}
所以cls
为Spark3DefaultSource
cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
case t: TableProvider
if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
Some(t)
case _ => None
}
Spark3DefaultSource
既是DataSourceRegister
的子类又是TableProvider
的子类,所以会先匹配DataSourceRegister
,但是由于useV1Sources
不包含hudi
,所以继续匹配TableProvider
,useV1Sources
不包含Spark3DefaultSource
,所以返回Some(Spark3DefaultSource)
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(_: FileDataSourceV2) => false
case Some(_) => true
case _ => false
}
Spark3DefaultSource
不是FileDataSourceV2
的子类,所以会匹配到case Some(_) => true
返回true,所以hudi是V2Provider
最新代码
通过上面讲的我们知道了为啥hudi是V2Provider
,所以最终CTAS会调用HoodieCatalog.createHoodieTable
,而不是调用CreateHoodieTableAsSelectCommand
,这就是和Spark2不同的关键点,不过最新代码又改成和Spark2一样调用CreateHoodieTableAsSelectCommand
,不走HoodieCatalog
的逻辑了,原因是因为把Spark3DefaultSource
注释掉了一部分
class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
override def shortName(): String = "hudi"
/*
def inferSchema: StructType = new StructType()
override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
override def getTable(schema: StructType,
partitioning: Array[Transform],
properties: java.util.Map[String, String]): Table = {
val options = new CaseInsensitiveStringMap(properties)
val path = options.get("path")
if (path == null) throw new HoodieException("'path' cannot be null, missing 'path' from table properties")
HoodieInternalV2Table(SparkSession.active, path)
}
*/
}
我们看到Spark3DefaultSource
不是TableProvider
的子类了,所以最终isV2Provider
返回false,后面的逻辑就和Spark2的一样了。
相关PR:https://github.com/apache/hudi/pull/5737
我们可以打印一下它的执行计划
== Parsed Logical Plan ==
'CreateTableAsSelectStatement [h0], [dt], [hoodie.datasource.write.operation=upsert, primaryKey=id, hoodie.table.name=tableName, hoodie.database.name=databaseName, type=cow, preCombineField=ts], hudi, false, false
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Analyzed Logical Plan ==
CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Optimized Logical Plan ==
CommandResult Execute CreateHoodieTableAsSelectCommand
+- CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
== Physical Plan ==
CommandResult <empty>
+- Execute CreateHoodieTableAsSelectCommand
+- CreateHoodieTableAsSelectCommand `default`.`h0`, ErrorIfExists
+- Project [1 AS id#0, a1 AS name#1, 10 AS price#2, 2021-04-01 AS dt#3, 1000 AS ts#4]
+- OneRowRelation
可以看到确实和Spark2一样也是通过CreateHoodieTableAsSelectCommand
来实现Hudi逻辑的




