Spark (3)SparkSQL

SparkSQL——DataFrame、DataSet

概述

  • Spark用于结构化数据(structured data)处理的Spark模块
  • Shark基于Hive,SparkSQL基于Shark,SparkSQL兼容Hive,可以从RDD、parquet文件、JSON文件
  • 简化RDD开发,提供两个编程抽象(类似RDD):DataFrame、DataSet
  • 特点:
    • 易整合:无缝整合SQL查询和Spark编程
    • 统一的数据访问:相同的方式连接不同的数据源
    • 兼容Hive:已有的数据仓库上可以直接运行SQL、HiveQL
    • 标准数据连接:通过JDBC、ODBC连接
  • 写SQL语句时,自动将封装好的模型(DataFrame、DataSet)转换为RDD

  • DataFrame:

    • 以RDD为基础的分布式数据集,类似传统数据库的二维表格

    • 与RDD主要区别在于,DataFrame带有schema元信息(DataFrame所表示的二维表数据集的每一列都带有名称和类型),这使得Spark SQL对作用于DataFrame的变换进行针对性的优化,提升运行效率;RDD不知道所存数据元素的具体内部结构,Spark Core 只在stage层面进行简单、通用的流水线优化

    • DataFrame支持嵌套数据类型(struct、array 和 map)

    • DataFrame API提供高层的关系操作,比函数式的RDD API友好

      image-20220810130214714

    • 左侧RDD[Person]虽然以Person为类型参数,但本身不了解Person类的内部结构,DataFrame提供详细的结构信息,Spark SQL知道该数据集中包含哪些列,每列的名称和类型——DataFrame为数据提供Schema的视图

  • DataSet:
    • 分布式数据集合,是DataFrame的一个扩展
    • 可以使用功能性的转换(操作map,flatMap,filter等等)
    • 具有类型安全检查、DataFrame的查询优化特性
    • 用样例类对DataSet定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet中的字段名称
    • DataSet是强类型的,可以有 DataSet[Car]、DataSet[Person]
    • DataFrame是DataSet的特例,DataFrame=DataSet[T] ,因此可以通过as方法将DataFrame转换为DataSet

SparkSQL

SQL查询起点

  • Spark Core执行应用程序需要先构建上下文环境对象SparkContext,SparkSQL同样有上下文环境对象SparkSession——SparkSession内部封装SparkContext,计算实际上是由SparkContext完成

  • 使用spark shell时,spark自动创建一个名为spark的SparkSession对象(同样的,会自动创建一个名为sc的SparkContext)

    image-20220810134857239

  • 一个示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    scala> val df = spark.read.json("./user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

    scala> df.show
    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30| lisi|
    +---+--------+

    scala> df.createTempView("user")

    scala> spark.sql("select * from user").show
    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30| lisi|
    +---+--------+

    scala> spark.sql("select avg(age) from user").show
    +--------+
    |avg(age)|
    +--------+
    | 25.0|
    +--------+

DataFrame

  • DataFrame API既有transformation操作也有action操作

创建DataFrame

  • SparkSession是创建DataFrame和执行SQL的入口
  • 创建DataFrame的三种方式:
    • 通过Spark数据源创建
      • Spark支持的数据源格式:csv format jdbc json load option options orc parquet schema table text textFile
      • 读取json文件创建DataFrame:val df = spark.read.json("data/user.json")
      • 文件读取时,数据字段按照字典序排序。例如,有字段username和age,即使数据源中age在username之后({"username": "thomas", "age": 10}),DataFrame的元数据中age会在username之前([age: bigint, username: string]
      • 如果从内存获取数据,数字默认为Int,但从文件中读取的数字会用bigint接收,bigint可以和Long转换,不能和Int转换
    • 从一个存在的RDD转换
    • 从Hive Table进行查询返回

SQL语法

  • 用SQL语法查询:SQL语法的查询必须要有临时视图或全局视图辅助——普通的临时表是Session范围内的,如果要在应用范围内有效,使用全局临时表。全局临时表需要全路径访问,如:global_temp.people

    • 对DataFrame创建一个临时视图(createOrReplaceTempView:防止多次创建导致重复),注意,视图和表不同,视图只能查询不能改,因为这个视图源于DataFrame

      1
      scala> df.createOrReplaceTempView("people")
    • 使用SQL查询(临时视图people只在当前的会话中有效,如果换一个连接则无法访问)

      1
      2
      3
      4
      5
      scala> val sqlDF = spark.sql("SELECT * FROM people")
      sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

      scala> spark.newSession().sql("SELECT * FROM people")
      报错,table or view not found
    • 创建一个全局视图

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      scala> df.createGlobalTempView("people")

      scala> spark.sql("SELECT * FROM global_temp.people").show()
      +---+--------+
      |age|username|
      +---+--------+
      | 20|zhangsan|
      | 30| lisi|
      | 40| wangwu|
      +---+--------+

DSL语法

特定领域语言(domain-specific language, DSL)(略)

RDD、DataFrame相互转换

  • RDD转DF:

    • RDD与DF、DS 之间互相操作需要引入 import spark.implicits._,这里spark是创建的SparkSession对象的变量名,即先创建SparkSession对象再导入。spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入

    • spark-shell中无需导入,自动完成此操作

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      scala> val idRDD = sc.textFile("data/id.txt")
      scala> idRDD.toDF("id").show
      +---+
      | id|
      +---+
      | 1|
      | 2|
      | 3|
      | 4|
      +---+
    • 实际开发中,一般通过样例类将RDD转换为DataFrame

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      scala> case class User(name:String, age:Int)
      defined class User
      scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,
      t._2)).toDF.show
      +--------+---+
      | name|age|
      +--------+---+
      |zhangsan| 30|
      | lisi| 40|
      +--------+---+
  • DF转RDD

    • DataFrame是对RDD的封装,可以直接获取内部的RDD

      1
      2
      3
      4
      5
      6
      7
      8
      scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF
      df: org.apache.spark.sql.DataFrame = [name: string, age: int]

      scala> val rdd = df.rdd
      rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25

      scala> val array = rdd.collect
      array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
    • 此时RDD的存储类型为Row

      1
      2
      3
      4
      5
      6
      7
      8
      scala> array(0)
      org.apache.spark.sql.Row = [zhangsan,30]

      scala> array(0)(0)
      Any = zhangsan

      scala> array(0).getAs[String]("name")
      String = zhangsan

DataSet

  • 强类型的数据集合,需要提供类型信息——DataFrame可以叫DataSet[Row],每一行类型是Row,不解析,每一行有哪些字、各个字段是什么类型都无从得知

创建DataSet

  • 样例类序列创建DataSet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    scala> case class Person(name: String, age: Long)
    defined class Person

    scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

    scala> caseClassDS.show
    +---------+---+
    | name|age|
    +---------+---+
    | zhangsan| 2|
    +---------+---+
  • Scala基本类型的序列创建DataSet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    scala> val ds = Seq(1,2,3,4,5).toDS
    ds: org.apache.spark.sql.Dataset[Int] = [value: int]
    scala> ds.show
    +-----+
    |value|
    +-----+
    | 1|
    | 2|
    | 3|
    | 4|
    | 5|
    +-----+
  • 实际使用中,很少将序列转换成DataSet,更多通过RDD得到DataSet

RDD、DataSet相互转换

  • RDD转换DataSet

    • 自动将包含有case class的RDD转换成 DataSet

    • case class定义了table的结构,case class属性通过反射变成表的列名

    • case class可以包含诸如Seq或者Array等复杂的结构

      1
      2
      3
      4
      5
      scala> case class User(name:String, age:Int)
      defined class User

      scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
      org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  • DataSet转换RDD:DataSet也是对RDD的封装

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    scala> case class User(name:String, age:Int)
    defined class User

    scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
    org.apache.spark.sql.Dataset[User] = [name: string, age: int]

    scala> val rdd = res11.rdd
    rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25

    scala> rdd.collect
    Array[User] = Array(User(zhangsan,30), User(lisi,49))

DataFrame、DataSet转换

  • DataFrame是DataSet的特例

  • DataFrame转换DataSet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    scala> case class User(name:String, age:Int)
    defined class User

    scala> val df = sc.makeRDD(List(("zhangsan",30),
    ("lisi",49))).toDF("name","age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]

    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  • DataSet转换DataFrame

    1
    2
    3
    4
    5
    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

    scala> val df = ds.toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]

RDD、DataFrame、DataSet三者关系

  • DataSet有可能逐步取代RDD和DataFrame成为唯一的API接口
  • 共性:
    • spark平台下的分布式弹性数据集
    • 都有惰性机制,在创建、转换(如map方法)时不会立即执行,只有遇到Action方法时,才会开始运算
    • 会根据Spark的内存情况自动缓存运算——不会内存溢出
    • DataFrame和DataSet均可使用模式匹配,获取各个字段的值和类型
  • 区别:
    • DataFrame每一行的类型固定为Row,每一列的值无法直接访问,只有通过getAs、模式匹配获得特定字段
    • Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同
  • 相互转换

image-20220810143317316

IDEA开发SparkSQL

  • 添加依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
    </dependency>
  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package TestSparkSQL

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

    object Demo {
    def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")

    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._ //spark 不是包名,是上下文环境对象名
    val df: DataFrame = spark.read.json("src/main/resources/testSql.json")
    df.show()

    //SQL 风格语法
    df.createOrReplaceTempView("user")
    spark.sql("select avg(age) from user").show

    //*****RDD=>DataFrame=>DataSet*****
    val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(
    List((1, "zhangsan", 30), (2, "lisi", 28), (3, "wangwu", 20))
    )
    val df1: DataFrame = rdd1.toDF("id", "name", "age")
    df1.show()
    val ds1: Dataset[User] = df1.as[User]
    ds1.show()

    //*****DataSet=>DataFrame=>RDD*****
    val df2: DataFrame = ds1.toDF()
    //返回的 RDD 类型为 Row,getXXX 方法获取字段值,类似 jdbc 处理结果集,索引从 0 开始
    val rdd2: RDD[Row] = df2.rdd
    rdd2.foreach(a => println(a.getString(1)))

    rdd1.map {
    case (id, name, age) => User(id, name, age)
    }.toDS()
    //*****DataSet=>=>RDD*****
    val rdd3 = ds1.rdd
    rdd3.foreach(a => println(a.id + "\t" + a.name + "\t" + a.age))
    //释放资源
    spark.stop()
    }

    case class User(id: Int, name: String, age: Int)

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    +---+--------+
    |age|username|
    +---+--------+
    | 15| Thomas|
    | 20| David|
    | 30| Bob|
    +---+--------+

    +------------------+
    | avg(age)|
    +------------------+
    |21.666666666666668|
    +------------------+

    +---+--------+---+
    | id| name|age|
    +---+--------+---+
    | 1|zhangsan| 30|
    | 2| lisi| 28|
    | 3| wangwu| 20|
    +---+--------+---+

    +---+--------+---+
    | id| name|age|
    +---+--------+---+
    | 1|zhangsan| 30|
    | 2| lisi| 28|
    | 3| wangwu| 20|
    +---+--------+---+

    lisi
    wangwu
    zhangsan
    3 wangwu 20
    1 zhangsan 30
    2 lisi 28

用户自定义函数

  • 额外的功能补充

  • 注册、应用udf(给字段username添加一个前缀为例)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spark.udf.register("addPrefix", (x:String) => "Name:"+x)
    spark.sql("Select addPrefix(username), age from people").show()

    +-------------------+---+
    |addPrefix(username)|age|
    +-------------------+---+
    | Name:Thomas| 15|
    | Name:David| 20|
    | Name:Bob| 30|
    +-------------------+---+
  • UDAF(用户自定义聚合函数)

    • 强类型的Dataset和弱类型的DataFrame都提供相关的聚合函数,如count()max()

    • 继承 UserDefinedAggregateFunction实现用户自定义弱类型聚合函数

    • Spark3.0后,UserDefinedAggregateFunction不推荐使用,可以统一采用强类型聚合函数Aggregator

    • 以计算平均年龄为例

      • RDD:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        val res: (Int, Int) = data.map {
        case (name, age) =>
        (age, 1)
        }.reduce {
        (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
        }
        }
        println(res._1 / res._2)
      • UDAF弱类型

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
          import spark.implicits._ //spark 不是包名,是上下文环境对象名

        val dataFrame = data.toDF("username", "age")
        dataFrame.createOrReplaceTempView("user")
        spark.udf.register("ageAvg", new MyAvgUDAF())
        spark.sql("select ageAvg(age) from user").show()
        ...

        class MyAvgUDAF extends UserDefinedAggregateFunction {
        // 输入数据的结构: 这里只有年龄,Int
        override def inputSchema: StructType = {
        StructType(Array(StructField("age", LongType)))
        }

        // 缓冲区数据的结构: 临时的结构
        override def bufferSchema: StructType = {
        StructType(
        Array(StructField("total", LongType), StructField("count", LongType)
        )
        )
        }

        // 函数计算结果的数据类型:Out
        override def dataType: DataType = LongType

        // 函数的稳定性
        override def deterministic: Boolean = true

        // 缓冲区初始化
        override def initialize(buffer: MutableAggregationBuffer): Unit = {
        // 计算均值,需要cnt和sum
        buffer.update(0, 0L)
        buffer.update(1, 0L)
        }

        // 根据输入的数据更新缓冲区数据
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer.update(0, buffer.getLong(0) + input.getLong(0))
        buffer.update(1, buffer.getLong(1) + 1)
        }

        // 缓冲区数据合并
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
        buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
        }

        // 计算平均值
        override def evaluate(buffer: Row): Any = {
        buffer.getLong(0) / buffer.getLong(1)
        }
        }
      • UDAF强类型(也可以用DSL语法来做,这里略),IN:输入数据类型、BUF:缓冲区数据类型、OUT:输出数据类型

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        case class Buff(var total: Long, var count: Long)

        class AvgUDAFStrong extends Aggregator[Long, Buff, Long] { // [In, Buff, Out]
        // 缓冲区的初始化
        override def zero: Buff = {
        Buff(0L, 0L)
        }

        // 根据输入的数据更新缓冲区的数据
        override def reduce(buff: Buff, in: Long): Buff = {
        buff.total = buff.total + in
        buff.count = buff.count + 1
        buff
        }

        // 合并缓冲区
        override def merge(buff1: Buff, buff2: Buff): Buff = {
        buff1.total = buff1.total + buff2.total
        buff1.count = buff1.count + buff2.count
        buff1
        }

        //计算结果
        override def finish(buff: Buff): Long = {
        buff.total / buff.count
        }

        // 缓冲区的编码操作,用于序列化,一般是固定写法
        override def bufferEncoder: Encoder[Buff] = Encoders.product

        // 输出的编码操作
        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
        }
        ...
        dataFrame.createOrReplaceTempView("user")
        spark.udf.register("ageAvg", functions.udaf(new AvgUDAFStrong()))
        spark.sql("select ageAvg(age) from user").show()

数据加载、保存

  • 使用相同的API,根据不同的参数读取、保存不同格式的数据(默认读取和保存的文件格式 为parquet)
  • 加载数据:spark.read.format("…")[.option("…")].load("…")
    • format:指定加载的数据类型,csv、jdbc、json、orc、parquet、textFile
    • load:传入加载数据路径
    • option:jdbc格式下,传入JDBC的相应参数,url、user、password、dbtable
  • 保存数据:dataframe.write.format("…")[.option("…")].save("…")
    • format:指定保存的数据类型,csv、jdbc、json、orc、parquet、textFile
    • load:传入保存数据的路径
    • option:jdbc格式下,传入JDBC的相应参数,url、user、password、dbtable
    • 设置save mode(不是原子操作,没有加锁):df.write.mode("append").json("/opt/module/data/output"),mode包括error、append、overwrite、ignore

Parquet

  • Parquet是能存储嵌套数据的列式存储格式,数据源为Parquet文件时上述加载、保存操作不需要使用format——修改配置项spark.sql.sources.default,可修改默认数据源格式
  • 加载:val df = spark.read.load("...")
  • 保存:df.write.mode("append").save("...")

Json

  • SparkSQL自动推断Json数据集结构,加载为Dataset[Row]——Spark读取的JSON文件不是传统的JSON文件,每一行是一个JSON串

    1
    2
    {"name":"Michael"}
    [{"name":"Justin""age":19},{"name":"Justin""age":19}]
  • 加载:spark.read.json(path)

CSV

  • CSV文件第一行为字段名:spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").option("header", "true").load("data/user.csv")

MySQL

  • 导入依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
    </dependency>
  • 数据读取

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    val ds: Dataset[User2] = rdd.toDS

    //方式 1:通用的 load 方法读取
    spark.read.format("jdbc")
    .option("url", "jdbc:mysql://linux1:3306/spark-sql")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root")
    .option("password", "123123")
    .option("dbtable", "user")
    .load().show

    //方式 2:通用的 load 方法读取 参数另一种形式
    spark.read.format("jdbc")
    .options(
    Map(
    "url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=123123",
    "dbtable"->"user","driver"->"com.mysql.jdbc.Driver"
    )
    ).load().show

    //方式 3:使用 jdbc 方法读取
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123123")
    val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)
  • 数据保存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //方式 1:通用的方式 format 指定写出类型
    ds.write
    .format("jdbc")
    .option("url", "jdbc:mysql://linux1:3306/spark-sql")
    .option("user", "root")
    .option("password", "123123")
    .option("dbtable", "user")
    .mode(SaveMode.Append)
    .save()

    //方式 2:通过 jdbc 方法
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123123")
    ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", "user", props)

Hive

略(若要把Spark SQL连接到一个部署好的Hive上,要把hive-site.xml复制到Spark配置文件目录)