暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一文学会Spark Sql

BigData Scholar 2021-06-29
230


3.1 概述

是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用

3.1.1 DataFrame

1)DataFrame也是一个分布式数据容器。更像传统数据库的二维表格,除了数据以外,还记录schema

2)RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而DataFrame却提供了详细的结构信息,以把它当做数据库中的一张表

3)查询性能比RDD高,查询计划通过Spark catalyst optimiser进行优化

3.1.2 DataSet

1)是Dataframe API的一个扩展,是Spark最新的数据抽象。

2)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。

3)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。

4) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。

5)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].

6)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比

3.2 编程

3.2.1 SparkSession

Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext(老版本)的组合

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSql")
//创建SparkSession对象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()

3.2.2 DataFrame

3.2.2.1 创建

1)从数据源创建

spark.read.支持:csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

2)从Hive Table查询(后面总结)

3)RDD转换(后面总结)

3.2.2.2 Sql风格语法(主要)

1)创建表

临时表

df.createOrReplaceTempView("people")

全局表

df.createGlobalTempView("people")

2)Sql查询

临时表

val sqlDF = spark.sql("SELECT * FROM people").show()

全局表

spark.sql("SELECT * FROM global_temp.people").show()

结果展示

sqlDF.show

3)Dsl语法风格(次要)

查看schema

df.printSchema

查看列

df.select("name").show()

运算统计

df.select($"name",$"age"+1).show()
df.select($"age">2).show()
df.groupBy("age").count().show()

3.2.2.3 与RDD互转

注意:RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._

3.2.2.3.1 RDD转DataFrame(.toDF)

1)通过手动确定转换

import spark.implicits._
val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD.map{line=>val para = line.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")

2)通过反射确定(样例类)

case class People(name:String, age:Int)
peopleRDD.map{ line => val para = line.split(",");People(para(0),para(1).trim.toInt)}.toDF

3)通过编程的方式(了解)

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//创建schema
val structType: StructType = StructType(StructField("name"StringType) :: StructField("age"IntegerType) :: Nil)
//根据给定的类型创建二元组RDD
val data = peopleRDD.map{ line => val para = line.split(",");Row(para(0),para(1).trim.toInt)}
//根据数据和schama创建
val dataFrame = spark.createDataFrame(data, structType)

3.2.2.3.2 DataFrame转RDD(.rdd)
df.rdd

3.2.3 DataSet

3.2.3.1 创建

//创建样例类
case class Person(name: String, age: Long)
//创建DS
val ds = Seq(Person("woodie",22)).toDS()

3.2.3.2 与RDD互转

3.2.3.2.1 RDD转DataSet(.toDS)
//创建RDD
val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
//创建样例类
case class Person(name: String, age: Long)
//转换成DS
val ds = peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS()

3.2.3.2.2 DataSet转RDD(.rdd)
ds.rdd

3.2.3.2.3 与DataFrame互转

1)DataFrame转DataSet(.as[样例类])

//创建DF
val df = spark.read.json("examples/src/main/resources/people.json")
//创建样例类
case class Person(name: String, age: Long)
//转化为DS
val ds = df.as[Person]

2)DataSet转DataFrame(.toDF)

ds.toDF

3.2.4 RDD/DF/DS区别与联系

3.2.4.1 共性

1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。

3)三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。

4)三者都有partition的概念

5)三者有许多共同的函数,如filter,排序等

6)在对DataFrame和Dataset进行操作许多操作都需要import spark.implicits._

7)DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

3.2.4.2 区别

3.2.4.2.1 RDD

1)RDD一般和spark mlib同时使用

2)RDD不支持sparksql操作

3.2.4.2.2 DF

1)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值

df.foreach{
  line =>
  val col1=line.getAs[String]("name")
  val col2=line.getAs[Long]("age")
  println(col1+":"+col2)
}

2)DataFrame与Dataset一般不与spark mlib同时使用

3)DataFrame与Dataset均支持sparksql的操作

4)DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头

//保存
val saveoptions = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://hadoop102:9000/test")
datawDF.write.format("com.woodie.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" -> "true""delimiter" -> "\t""path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.woodie.spark.csv").load()

3.2.4.2.3 DS

1)Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。

2)DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

ds.map{
  line=>
  println(line.col1)
  println(line.col2)
}

3.3 UDF

3.3.1 自定义UDF函数

spark.udf.register("addName", (x:String)=> "Name:"+x)
df.createOrReplaceTempView("people")
spark.sql("Select addName(name), age from people").show()

3.3.2 自定义聚合函数

object Spark16_Udaf {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Udaf")
    //创建SparkSession对象
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    //创建DataFrame
    val df: DataFrame = spark.read.json("/usr/local/spark/examples/src/main/resources/people.json")
    //创建表
    df.createOrReplaceTempView("people")
    //创建聚合函数对象
    val avgFuc = new AvgFuc
    //注册函数
    spark.udf.register("avgFuc",avgFuc)
    spark.sql("select avgFuc(age) from people").show()
    spark.stop()
  }

}
//用户自定义聚合函数(平均年龄)
class AvgFuc extends UserDefinedAggregateFunction{
  //输入的数据结构
  override def inputSchemaStructType = {
    new StructType().add("age",LongType)
  }
  //计算时缓冲区的数据结构
  override def bufferSchemaStructType = {
    new StructType().add("sum",LongType).add("count",LongType)
  }
  //函数返回的数据类型
  override def dataTypeDataType = DoubleType
  //函数是否稳定性
  override def deterministicBoolean = true
  //计算前缓冲区的初始化值
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  //根据查询结果更新缓冲区
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if(!input.isNullAt(0)){
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  //多个节点缓冲区合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}

3.4 数据源

3.4.1 通用加载/保存方法

3.4.1.1 手动指定选项

1)Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作

val df = spark.read.load("examples/src/main/resources/users.parquet"
df.select("name""favorite_color").write.save("namesAndFavColors.parquet")

2)数据源格式不是parquet格式文件时,需要手动指定数据源的格式(内置格式:json, parquet, jdbc, orc, libsvm, csv, text)

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.write.format("parquet").save("hdfs://hadoop001:9000/namesAndAges.parquet")
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs:// hadoop001:9000/namesAndAges.parquet`")

3.4.1.2 文件保存选项

采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式

3.4.1.3 JDBC

1)加载数据

val jdbcDF = spark.read
.format("jdbc")
.option("url""jdbc:mysql://hadoop001:3306/rdd")
.option("dbtable""rddtable")
.option("user""root")
.option("password""000000")
.load()

val connectionProperties = new Properties()
connectionProperties.put("user""root")
connectionProperties.put("password""000000")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop001:3306/rdd""rddtable", connectionProperties)

2)写入数据

jdbcDF.write
.format("jdbc")
.option("url""jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable""dftable")
.option("user""root")
.option("password""000000")
.save()

jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/rdd""db", connectionProperties)

3.4.1.4 Hive

1)内嵌Hive

直接使用

2)外部Hive

a)将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下。

b)spark shell:注意带上访问Hive元数据库的JDBC客户端

c)IDEA:hive-site.xml添加到ClassPath下

val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()



文章转载自BigData Scholar,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论