暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

扩展Spark Catalyst,打造自定义Spark SQL引擎

百科程序猿 2021-12-28
1699

一、Spark SQL 架构简介

Spark SQLcorecatalysthivehive-thriftserver4个部分组成,整个过程先会将SQL语句进行解析(parse)形成一个Tree,然后使用Rule对Tree进行绑定,优化等处理过程,通过模式匹配对不同类型的节点采用不同操作。而sparksql的查询优化器是catalyst,它负责处理查询语句的解析,绑定,优化和生成物理执行计划等过程,catalyst是sparksql最核心部分。
core: 负责处理数据的输入/输出,从不同的数据源获取数据,然后将结果查询结果输出成Data Frame。
catalyst: 负责查询语句的整个处理过程,包括解析,绑定,优化,生成物理计划等。
hive: 负责对hive数据的处理。
hive-thriftserver:提供client和JDBC/ODBC等接口。


二、Catalyst 架构及执行流程分析
Spark SQL快速的计算效率得益于Catalyst优化器。从HiveQL被解析成语法抽象树起,执行计划生成和优化的工作全部交给Spark SQL的Catalyst优化器进行负责和管理。
SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan,Unresolved Logical Plan通过Analyzer模块借助于Catalog中的表信息解析为Logical Plan,此时Optimizer再通过各种基于规则的优化策略进行深入优化,得到Optimized Logical Plan,优化后的逻辑执行计划依然是逻辑的,并不能被Spark系统理解,此时需要将此逻辑执行计划转换为Physical Plan。
主要流程大概可以分为以下几步:
  1. Sql语句经过Antlr4解析,生成Unresolved Logical Plan。
  2. analyzer与catalog进行绑定(catlog存储元数据),生成Logical Plan。
  3. optimizer对Logical Plan优化,生成Optimized LogicalPlan。
  4. SparkPlan将Optimized LogicalPlan转换成 Physical Plan。
  5. prepareForExecution()将 Physical Plan 转换成 executed Physical Plan。
  6. execute()执行可执行物理计划,得到RDD。

三、Spark Catalyst扩展点

Spark catalyst的扩展点在SPARK-18127中被引入,Spark用户可以在SQL处理的各个阶段扩展自定义实现,非常强大高效,下面我们具体看看其提供的接口和在Spark中的实现。SparkSessionExtensions保存了所有用户自定义的扩展规则,自定义规则保存在成员变量中,对于不同阶段的自定义规则,SparkSessionExtensions提供了不同的接口。
用户可以通过SparkSessionExtensions提供的inject开头的方法添加新的自定义规则,具体的inject接口如下:
  • injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化。

  • injectParser – 添加parser自定义规则,parser负责SQL解析。

  • injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。

  • injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。

  • injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。

  • injectCheckRule – 添加Analyzer自定义Check规则。

四、SPARK与HIVE在默认类型提升上不一致?
首先来看看spark的类型提升的优先级(图片来源于官网):

在来分析一下hive的默认类型提升优先级(图片来源于网络):

从以上的类型提升优先级可以看出,spark对字符串类型转数字类型的默认提升是不一致的,spark最低优先级为int类型,而hive的最低优先级类型为double类型,下面我们使用一个实际的例子来展示。

假设我们有一个hive数据表,表中存在这样一个特殊的字段,字段的类型为String,但是实际存放的数据为double的字符串数据,如下表的f1字段。
id [string]f1 [string]f2 [double]
kn00011.51.4
kn00022.52.4
kn00033.53.4
kn00044.54.4
kn00055.55.4
1、SPARK:现在来执行分析这样一个语句:"select * from table where f1 > 3",首先来查看spark的执行计划,如下。从下面的执行计划可以看出,如果给定的条件"f1 > 3"为整数时,执行计划将把f1字段提升为int类型:
    == Physical Plan ==


    *(1) Project [id#10, f1#11, f2#12]


    +- *(1) Filter (isnotnull(f1#11) && (cast(f1#11 as int) > 3))

    过滤得到的结果为:

    idf1f2
    kn00044.54.4
    kn00055.55.4
    2、SPARK:当把条件修改为浮点类型后:"select * from table where f1 > 3.0",再来查看spark的执行计划,如下。从下面的执行计划可以看出,如果给定的条件"f1 > 3.0"为浮点数时,执行计划将把f1字段提升为double类型:
      == Physical Plan ==


      *(1) Project [id#10, f1#11, f2#12]


      +- *(1) Filter (isnotnull(f1#11) && (cast(f1#11 as double) > 3.0))


      过滤得到的结构为:

      idf1f2
      kn00033.53.4
      kn00044.54.4
      kn00055.55.4
      综合以上执行计划的结果可以看出,spark的类型提升会依赖于条件中数据类型。不同的执行计划最后得到的结果也不一样。


               3、HIVE:针对语句"select * from table where f1 > 3",我们来看看hive的执行计划和结果,如下,执行计划并没显示对f1字段的类型,但从结果可以看出,条件的类型被提升为浮点类型了。
        TableScan
        alias: upload_20211228102754
        filterExpr: (f1 > 3) (type: boolean)
        Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE
        Filter Operator
        predicate: (f1 > 3) (type: boolean)
        Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE
             Select Operator
        expressions: id (type: string), f1 (type: string), f2 (type: double)
        outputColumnNames: _col0, _col1, _col2
        Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE

        查询得到的结果:

        idf1f2
        kn00033.53.4
        kn00044.54.4
        kn00055.55.4
        4、HIVE:针对语句"select * from table where f1 > 3.0",我们再来看看hive的执行计划和结果,如下:
          TableScan
          alias: upload_20211228102754
          filterExpr: (f1 > 3.0) (type: boolean)
          Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
          predicate: (f1 > 3.0) (type: boolean)
          Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE
          Select Operator
          expressions: id (type: string), f1 (type: string), f2 (type: double)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE

          查询得到的结果:

          idf1f2
          kn00033.53.4
          kn00044.54.4
          kn00055.55.4
          综合以上执行计划的结果可以看出,HIVE的类型提升不会依赖于条件中数据类型,并且类型都会被提升为浮点类型

          五、利用Catalyst解决SPARK在默认类型提升上与HIVE的不一致

          首先我们知道Spark支持catalyst的扩展点,以下是通过rule对执行计划进行特定的修改,目前存在类型提升的运算符组要有四个:>、>=、<、<=。我们在程序中需要去捕获这四类运算符,并且运算符的的左支的类型被强制转换为LongTypeIntegerType,右支为数字类型LongTypeIntegerType,我们需要强制将右的类型修改为DoubleType,这样就能保持和HIVE的一致性。

            import org.apache.spark.internal.Logging
            import org.apache.spark.sql.types.DataTypes
            import org.apache.spark.sql.catalyst.rules.Rule
            import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
            import org.apache.spark.sql.types.DataTypes.{IntegerType, LongType}
            import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
            import org.apache.spark.sql.catalyst.expressions.{GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual}


            /**
            *
            * 解决当字段为字符串,但数据为数字类型时,sql中若使用比较符号时,存在类型提升不足的问题。
            * 例如:字段f1为浮点型数据,但是条件给定为:f1>5,spark默认会解析为:cast(cast(f1 as string) as int) > 3
            * 这样就会损失精度,f1=3.3将不会满足条件,被过滤掉
            */
            object CastTypeOptimization extends Rule[LogicalPlan] with Logging{


            /**
            * 检查当前节点是否符合强制转换类型的规则节点
            * @param left 左节点
            * @param right 右节点
            * @return 是否满足条件
            */
            def checkRule(left: Expression, right: Expression): Boolean = {
            var flag = false
            if(
            (left.isInstanceOf[Cast] && left.dataType==LongType && right.isInstanceOf[Literal] && right.dataType==LongType)
            ||
            (left.isInstanceOf[Cast] && left.dataType==IntegerType && right.isInstanceOf[Literal] && right.dataType==IntegerType)
            ){
            flag = true
            }
            flag
            }


            /**
            * 修改强制转换的类型为DoubleType
            * @param plan 执行计划
            * @return 新的执行计划
            */
            override def apply(plan: LogicalPlan): LogicalPlan = {
            plan transformAllExpressions {
            case GreaterThan(left, right) if(checkRule(left, right)) => {
            val leftCastExp = Cast(left.children.head, DataTypes.DoubleType)
            GreaterThan(leftCastExp, right)
            }
            case LessThan(left, right) if(checkRule(left, right)) => {
            val leftCastExp = Cast(left.children.head, DataTypes.DoubleType)
            LessThan(leftCastExp, right)
            }
            case GreaterThanOrEqual(left, right) if(checkRule(left, right)) => {
            val leftCastExp = Cast(left.children.head, DataTypes.DoubleType)
            GreaterThanOrEqual(leftCastExp, right)
            }
            case LessThanOrEqual(left, right) if(checkRule(left, right)) => {
            val leftCastExp = Cast(left.children.head, DataTypes.DoubleType)
            LessThanOrEqual(leftCastExp, right)
            }
            }
            }


            }

              //添加Cast优化器
              sparkSession.experimental.extraOptimizations = Seq(CastTypeOptimization)

              现在使用spark再次执行分析语句:"select * from table where f1 > 3":

                == Physical Plan ==


                *(1) Project [id#10, f1#11, f2#12]


                +- *(1) Filter (isnotnull(f1#11) && (cast(f1#11 as double) > 3.0))

                过滤得到的结构为:

                idf1f2
                kn00033.53.4
                kn00044.54.4
                kn00055.55.4

                从结果可以看出,通过自定义的优化器,且在不变更程序的条件下,就可以让执行保持和hive的一致。

                文章转载自百科程序猿,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论