SparkSQL快速使用

By timebusker on July 15, 2018

SparkSQL常用内置配置项

SparkSQL

Spark SQL是Spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件Hive中的表外部的关系型数据库,以及RDD

原理:将Spark SQL转化为RDD,然后提交到集群执行

特点:

  • 容易整合
  • 统一的数据访问方式
  • 兼容 Hive
  • 标准的数据连接

SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在Spark的早期版本中,由于RDD是主要的API,SparkContext是spark的主要切入点,我们可以通过Sparkcontext来创建和操作RDD。但这样,对于每个其他的API,我们需要使用不同的context。 例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用HiveContext。

随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在Spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点, SparkSession封装了SparkConf、SparkContext和SQLContext,为了向后兼容,SQLContext和HiveContext也被保存下来。

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。 SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。

特点:

  • 为用户提供一个统一的切入点使用Spark各项功能
  • 允许用户通过它调用DataFrameDataset相关API来编写程序
  • 减少了用户需要了解的一些概念,可以很容易的与Spark进行交互
  • Spark交互之时不需要显示的创建SparkConf, SparkContext以及SQlContext,这些对象已经封闭在SparkSession

DataFrames

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格

DataFrames

DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息, 从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。 反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

RDD转换成为DataFrame

数据文件

tom,12
sandy,34
alixe,23
marry,31
  • 通过case class创建DataFrames(反射)
//定义case class,相当于表结构
case class People(var name:String,var age:Int)
object TestDataFrame {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val context = new SQLContext(sc)
    // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
    val peopleRDD = sc.textFile("E:\\test\\data\\people.txt").map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
    import context.implicits._
    // 将RDD 转换成 DataFrames
    val df = peopleRDD.toDF
    //将DataFrames创建成一个临时的视图
    df.createOrReplaceTempView("view_people")
    //使用SQL语句进行查询
    context.sql("select * from view_people").show()
  }
}
  • 通过structType创建DataFrames(编程接口)
object TestDataFrame {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val fileRDD = sc.textFile("E:\\test\\data\\people.txt")
    // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
    val rowRDD: RDD[Row] = fileRDD.map(line => {
            val fields = line.split(",")
            Row(fields(0), fields(1).trim.toInt)
        })
    // 创建 StructType 来定义结构
    val structType: StructType = StructType(
      //字段名,字段类型,是否可以为空
      StructField("name", StringType, true) ::
      StructField("age", IntegerType, true) :: Nil
    )
    /**
      * rows: java.util.List[Row],
      * schema: StructType
      * */
    val df:DataFrame = sqlContext.createDataFrame(rowRDD,structType)
    df.createOrReplaceTempView("view_people")
    sqlContext.sql("select * from view_people").show()
  }
}
  • 通过json文件创建DataFrames
object TestDataFrame3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df: DataFrame = sqlContext.read.json("E:\\test\\data\\people.json")
    df.createOrReplaceTempView("view_people")
    sqlContext.sql("select * from view_people").show()
  }
}

DataFrame文件读取与保存

  • 读取
object TestRead {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DataFrameRead").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //方式一(指定格式读)
    val df1 = sqlContext.read.json("E:\\test\\data\\people.json")
    val df2 = sqlContext.read.parquet("E:\\test\\data\\users.parquet")
    //方式二(设置格式读)
    val df3 = sqlContext.read.format("json").load("E:\\test\\data\\people.json")
    val df4 = sqlContext.read.format("parquet").load("E:\\test\\data\\users.parquet")
    //方式三,默认是parquet格式(默认格式读)
    val df5 = sqlContext.load("E:\\test\\data\\users.parquet")
  }
}
  • 保存
object TestSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DataFrameSave").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df1 = sqlContext.read.json("E:\\test\\data\\people.json")
    //方式一(指定格式写)
    df1.write.json("E:\\test\\data\\success\\111")
    df1.write.parquet("E:\\test\\data\\success\\222")
    //方式二(设置格式写)
    df1.write.format("json").save("E:\\test\\data\\success\\333")
    df1.write.format("parquet").save("E:\\test\\data\\success\\444")
    //方式三(默认格式写)
    df1.write.save("E:\\test\\data\\success\\555")
  }
}
  • 保存模式 即数据保存目录的追加覆盖忽略存在抛异常四种模式。

DataFrames

数据源

  • 读取JSON数据文件计算

  • 读取parquet数据文件计算

  • 读取MySQL表数据计算

object TestMySQL {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestMySQL").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
    val table = "tb_user"
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
    val df = sqlContext.read.jdbc(url,table,properties)
    df.createOrReplaceTempView("view_user")
    sqlContext.sql("select * from view_user").show()

  }
}
  • 读取Hive表数据计算

开发前,需要集群整合SparkSQL和Hive,可用

引入Hive依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.3.0</version>
</dependency>

拷贝集群hive-site.xml到工程resources目录下

测试代码

object TestHive {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("select * from myhive.student").show()
  }
}