Spark作为一个分布式计算引擎,其核心模块SQL支持外部数据源扩展,极其提高了其应用场景的广度和深度。自定义外部数据源,网上存在大量的学习资料,但Spark内部如何实现这些扩展机制的资料,去少之又少。我阅读并调试了大量的代码,最终理清了其脉络,由于涉及篇幅较多,我只写了核心过程,掌握这些过程,对理解sql扩展数据源机制会有极大帮助。
由于内容比较枯燥,建议读者结合Spark代码去分析下面的内容
关键抽象逻辑:
Sessionstate抽象类BaseSessionStateBuilder,被每一个应用端sessionstate使用。其内部定义一个Analyzer解析器。数据源相关逻辑规则
1、覆盖extendsResolutionRules并定义FindDataSourceTable规则,其会判断logicalplan是否是SimpleCatalogRelation,如果是,则把外部数据源数据包装成LogicalRelation(Relation),这个转换在创建表、查询和插入阶段均会使用。
2、覆盖postHocResolutionRules,并定义DataSourceAnalysis规则,转换createtable和insertintoTable操作计划。
核心接口:org.apache.spark.sql.sources.interfaces
这个接口内的抽象几乎满足了现有很多需求,如果不满足,可以在此自定义,并在下面关键地方实现自己的逻辑即可扩展。
创建表核心逻辑
1、 SparksqlParser.visitCreateTable传入schema、provider\properties等信息创建CatalogTable并以CreateTable逻辑计划返回,对应该计划不同版本,实现阶段不一样:
2、 2.10版本,在物理计划解析阶段,CreateTable被DDLStrategy转换为CreateDataSourceTablecommand计划,
3、 2.10之后版本,在逻辑计划解析阶段Analyzer(),CreateTable被DataSourceAnalysis 规则转换为CreateDataSourceTablecommand逻辑计划。
4、 CreateDataSourceTablecommand属于是DDL之类定义语句计划,其是RunableCommand的子类。在sparkStrategies的BasicOperators策略中把这个RunableCommand作为参数传入ExecutecommandExec计划里执行,被该计划的sideEffectResult函数调用。
5、 物理执行阶段执行该计划CreateDataSourceTablecommand的run函数,将根据这些创建信息传入并创建DataSource,通过resolveReleation根据provider创建对应的自定义DataSource数据源
查找核心逻辑:
1、 Analyzer阶段解析规则ResolveRelations.lookupTableFromCatalog通过catalog.lookupRelation函数执行逻辑内,如果是外部数据源,则封装为SubqueryAlias(relationAlias,SimpleCatalogRelation,view),然后使用上述扩展生成的FindDataSourceTable规则解析成LogicalRelation[tablescan]
2、 在PhysicalPlan解析阶段,DataSourceStrategy策略会对LogicalRelation逻辑计划进行解析,判断该relation是继承的哪个类型的接口(org.apache.spark.sql.sources.interfaces,并转换为对应的physicalplan, buildscan查询逻辑解析为RowDataSourceScanExec计划
插入核心逻辑:
1、 AstBuilder内的visitSingleInsertQuery访问InsertQueryContent,然后生成InsertIntoTable(UnresolvedRelation,partitionKeys,query,_,_),在Analyzer解析阶段被ResolveRelations解析并返回InsertIntoTable(),FindDataSourceTable规则解析成LogicalRelation,InsertIntoTable插入逻辑被DataSourceAnalysis规则转换为InsertIntoDataSourceCommand逻辑计划。
2、物理优化阶段:在sparkStrategies的BasicOperators策略中把RunableCommand作为参数传入ExecutecommandExec计划里执行,并解析成了ExecutedCommandExec(cmd))计划;
以上内容是经过代码调试的,如果你在调试阶段,有异议地方欢迎交流。
感谢关注:




