目录

 一、环境准备

(一)导入依赖

(二)创建SparkSQL的运行环境

二、DataFrame

(一)创建DataFrame

(二)SQL语法

1.首先,查询要有表名,我们要给这个二维表创建临时表并命名

2.对指定表进行SQL查询

3.创建全局临时表(全局临时视图)

(三)DSL语法

1.DSL语法简介

2.DataFrame中的API

3.DSL使用案例

4.RDD与DataFrame的相互转化

三、DataSet

(一)创建DataSet

(二)DataSet与DataFrame互相转换

1.DataFrame转DataSet=>as[样例类]

2.DataSet转DataFrame=>toDF()

(三)RDD与DataSet互相转换

1.RDD转DataSet=>样例类RDD.toDS()

2.DataSet转RDD=>.rdd

四、IDEA 开发 SparkSQL

1.添加依赖

2.创建SparkSQL运行环境

3.创建DataFrame并用SQL和DSL读取

一、环境准备

(一)导入依赖

UTF-8

1.8

1.8

3.1.2

8.0.29

junit

junit

4.11

org.apache.spark

spark-core_2.12

${spark.version}

org.apache.spark

spark-sql_2.12

${spark.version}

org.apache.spark

spark-hive_2.12

${spark.version}

org.apache.spark

spark-graphx_2.12

${spark.version}

mysql

mysql-connector-java

${mysql.version}

com.alibaba

fastjson

1.2.62

(二)创建SparkSQL的运行环境

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("dateset").getOrCreate()

val sc: SparkContext = spark.sparkContext

import spark.implicits._

二、DataFrame

        Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者

生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。

(一)创建DataFrame

// 读取文件的几种方法

val df: DataFrame = spark.read.json("in/user.json")

df.show()

spark.read.format("json").option("header","true").load("in/user.json").show()

spark.read.format("json").option("header","false").load("in/user.json").show()

##########################################################################

运行结果:

+---+---+--------+

|age| id| name|

+---+---+--------+

| 21| 1|zhangsan|

| 22| 2| lisi|

| 23| 3| wangwu|

| 23| 4| zl|

+---+---+--------+

(二)SQL语法

从上面的结果可以看出,文件中的数据转化为结构式二维表,接下来,对表中的数据进行查询:

1.首先,查询要有表名,我们要给这个二维表创建临时表并命名

// 将查询结果集转为视图——临时表

// view只能查询,不能修改

df.createOrReplaceTempView("user")

2.对指定表进行SQL查询

// 使用sql语句进行查询

spark.sql("select * from user").show()

/*

+---+---+--------+

|age| id| name|

+---+---+--------+

| 21| 1|zhangsan|

| 22| 2| lisi|

| 23| 3| wangwu|

| 23| 4| zl|

+---+---+--------+*/

spark.sql("select avg(age) from user").show()

/*

+--------+

|avg(age)|

+--------+

| 22.25|

+--------+*/

注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。 使用全局临时表时需要全路径访问,如:global_temp.people

// 接下来我们换一个Session,上面创建的user临时表就无法实现

// spark.newSession().sql("select * from user").show()// 错误写法

3.创建全局临时表(全局临时视图)

查询时要写全路径:global_temp.+表名

// 因此,我们要将临时表设置为全局

df.createGlobalTempView("user")

// 并且在对全局的临时表查询时,表名前要写全路径:global_temp.+表名

spark.newSession().sql("select * from global_temp.user").show()

+---+---+--------+

|age| id| name|

+---+---+--------+

| 21| 1|zhangsan|

| 22| 2| lisi|

| 23| 3| wangwu|

| 23| 4| zl|

+---+---+--------+

spark.sql("select * from global_temp.user").show()

+--------+

|avg(age)|

+--------+

| 22.25|

+--------+

(三)DSL语法

1.DSL语法简介

        DataFrame 提供一个特定领域语言(DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用DSL语法风格不必去创建临时视图了。

println("--------------------DSL语法------------------------")

df.printSchema()

root

|-- age: long (nullable = true)

|-- id: long (nullable = true)

|-- name: string (nullable = true)

2.DataFrame中的API

scala> df.

agg foreachPartition schema

alias groupBy select

apply groupByKey selectExpr

as head show

cache hint sort

checkpoint inputFiles sortWithinPartitions

coalesce intersect sparkSession

col intersectAll sqlContext

colRegex isEmpty stat

collect isLocal storageLevel

collectAsList isStreaming summary

columns javaRDD tail

count join take

createGlobalTempView joinWith takeAsList

createOrReplaceGlobalTempView limit toDF

createOrReplaceTempView localCheckpoint toJSON

createTempView map toJavaRDD

crossJoin mapPartitions toLocalIterator

cube na toString

describe observe transform

distinct orderBy union

drop persist unionAll

dropDuplicates printSchema unionByName

dtypes queryExecution unpersist

encoder randomSplit where

except randomSplitAsList withColumn

exceptAll rdd withColumnRenamed

explain reduce withWatermark

explode registerTempTable write

filter repartition writeStream

first repartitionByRange writeTo

flatMap rollup

foreach sample

3.DSL使用案例

// select方法:查询指定列

df.select("age").show()

+---+

|age|

+---+

| 21|

| 32|

| 23|

| 40|

+---+

———————————————————————————————————————————————————————————————————————————————————————————

// 注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

df.select($"age" + 1).show()

+---------+

|(age + 1)|

+---------+

| 22|

| 33|

| 24|

| 41|

+---------+

// 或者

df.select('name, 'age + 1).show()

+--------+---------+

| name|(age + 1)|

+--------+---------+

|zhangsan| 22|

| lisi| 33|

| wangwu| 24|

| zl| 41|

+--------+---------+

———————————————————————————————————————————————————————————————————————————————————————————

// 对运算的列重命名

df.select('name, 'age + 1 as "newAge").show()

+--------+------+

| name|newAge|

+--------+------+

|zhangsan| 22|

| lisi| 33|

| wangwu| 24|

| zl| 41|

+--------+------+

———————————————————————————————————————————————————————————————————————————————————————————

// filter方法:过滤

// 查询年龄>30

df.filter($"age" > 30).show()

+---+---+----+

|age| id|name|

+---+---+----+

| 32| 2|lisi|

| 40| 4| zl|

+---+---+----+

———————————————————————————————————————————————————————————————————————————————————————————

// 按照年龄进行分组,查看数据条数

df.groupBy("age").count().show()

+---+-----+

|age|count|

+---+-----+

| 32| 1|

| 21| 1|

| 23| 1|

| 40| 1|

+---+-----+

4.RDD与DataFrame的相互转化

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("dateset").getOrCreate()

val sc: SparkContext = spark.sparkContext

import spark.implicits._

// 创建一个rdd

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

rdd.collect().foreach(println)

/*

1

2

3

4*/

print("-----------------------------rdd转为DataFrame=>toDF()-----------------------------")

val df: DataFrame = rdd.toDF("id")

df.show()

/*

+---+

| id|

+---+

| 1|

| 2|

| 3|

| 4|

+---+*/

print("-----------------------------DataFrame转为rdd=>rdd-----------------------------")

df.rdd.collect().foreach(println)

/*

[1]

[2]

[3]

[4]*/

df.rdd.collect().foreach(x => println(x.mkString(",")))

/*

1

2

3

4*/

三、DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

(一)创建DataSet

main{

......

// 1.使用样例类序列创建 DataSet

// 样例类传参

val caseClassDS: Dataset[Person] = Seq(Person("zhangsan", 2)).toDS()

caseClassDS.show()

+--------+---+

| name|age|

+--------+---+

|zhangsan| 2|

+--------+---+

// 2.使用基本类型的序列创建 DataSet

// 注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

val ds: Dataset[Int] = Seq(7, 8, 45, 12, 45).toDS()

ds.show()

+-----+

|value|

+-----+

| 7|

| 8|

| 45|

| 12|

| 45|

+-----+

sc.stop()

}

// 创建样例类

// 注意:样例类要写在main方法的外面!!!

case class Person(name: String, age: Long)

样例类的好处:

Scala中在class关键字前加上case关键字 这个类就成为了样例类,样例类和普通类区别: (1)不需要new可以直接生成对象 (2)默认实现序列化接口 (3)默认自动覆盖 toString()、equals()、hashCode()

(二)DataSet与DataFrame互相转换

1.DataFrame转DataSet=>as[样例类]

main{

......

val df1: DataFrame = spark.read.json("in/user.json")

df1.show()

// as[样例类]

val ds1: Dataset[Emp] = df1.as[Emp]

ds1.show()

+---+---+--------+

|age| id| name|

+---+---+--------+

| 21| 1|zhangsan|

| 32| 2| lisi|

| 23| 3| wangwu|

| 40| 4| zl|

+---+---+--------+

}

// 定义样例类

case class Emp(age:Long,id:Long,name:String)

2.DataSet转DataFrame=>toDF()

// toDF()

ds1.toDF().show()

+---+---+--------+

|age| id| name|

+---+---+--------+

| 21| 1|zhangsan|

| 32| 2| lisi|

| 23| 3| wangwu|

| 40| 4| zl|

+---+---+--------+

(三)RDD与DataSet互相转换

1.RDD转DataSet=>样例类RDD.toDS()

case class Emp(age:Long,id:Long,name:String)// 定义在main方法外

// 样例类RDD.toDS()

val ds2: Dataset[Emp] = rdd1.toDS()

ds2.show()

+---+---+---------+

|age| id| name|

+---+---+---------+

| 30| 1| xioaming|

| 20| 2|xiaozhang|

+---+---+---------+

2.DataSet转RDD=>.rdd

ds2.rdd.collect().foreach(println)

--------------------------------------

Emp(30,1,xioaming)

Emp(20,2,xiaozhang)

四、IDEA 开发 SparkSQL

1.添加依赖

org.apache.spark

spark-sql_2.12

3.0.0

2.创建SparkSQL运行环境

// TODO 创建SparkSQL的运行环境

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")

val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

3.创建DataFrame并用SQL和DSL读取

// 创建DataFrame

val df: DataFrame = spark.read.json("datas/user.json")

df.show()

+---+--------+

|age|username|

+---+--------+

| 21|zhangsan|

| 32| lisi|

| 23| wangwu|

| 40| zl|

+---+--------+

// 两种方式读取DataFrame

println("--------------------DataFrame=>SQL-------------------------")

// 创建临时视图

df.createOrReplaceTempView("user")

// SQL语句访问视图

spark.sql("select * from user").show()

+---+--------+

|age|username|

+---+--------+

| 21|zhangsan|

| 32| lisi|

| 23| wangwu|

| 40| zl|

+---+--------+

spark.sql("select age,username from user").show()

+---+--------+

|age|username|

+---+--------+

| 21|zhangsan|

| 32| lisi|

| 23| wangwu|

| 40| zl|

+---+--------+

spark.sql("select avg(age) as avg_score from user").show()

+---------+

|avg_score|

+---------+

| 29.0|

+---------+

println("------------------DataFrame =>DSL--------------------------)

// 在使用DataFrame时,如果涉及到转换操作,需要引入转换规则

// import spark.implicits._

df.select("username", "age").show()

+--------+---+

|username|age|

+--------+---+

|zhangsan| 21|

| lisi| 32|

| wangwu| 23|

| zl| 40|

+--------+---+

df.select($"age" + 1).show()

+---------+

|(age + 1)|

+---------+

| 22|

| 33|

| 24|

| 41|

+---------+

df.select('username, 'age / 2 as "newAge").show()

+--------+------+

|username|newAge|

+--------+------+

|zhangsan| 10.5|

| lisi| 16.0|

| wangwu| 11.5|

| zl| 20.0|

+--------+------+

// TODO 创建DataSet

// DataFrame其实是特定泛型的DataSet

val seq = Seq(1, 2, 3, 4)

val ds: Dataset[Int] = seq.toDS()

ds.show()

+-----+

|value|

+-----+

| 1|

| 2|

| 3|

| 4|

+-----+

println("---------------- RDD <=> DataFrame--------------")

// TODO RDD转DataFrame

val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 35)))

val df1: DataFrame = rdd.toDF("id", "name", "age")

// TODO DataFrame 转 RDD

val rowRDD: RDD[Row] = df1.rdd

rowRDD.collect().foreach(println)

[1,zhangsan,30]

[2,lisi,35]

println("----------------DataFrame <=> DataSet--------------")

// TODO DataFrame 转 DataSet

val ds2: Dataset[User] = df1.as[User]// 样例类

ds2.show()

+---+--------+---+

| id| name|age|

+---+--------+---+

| 1|zhangsan| 30|

| 2| lisi| 35|

+---+--------+---+

// TODO DataSet 转 DataFrame

val df2: DataFrame = ds2.toDF()

df2.show()

+---+--------+---+

| id| name|age|

+---+--------+---+

| 1|zhangsan| 30|

| 2| lisi| 35|

+---+--------+---+

print("-----------------RDD <=> DataSet-----------------")

// TODO RDD 转 DataSet

val ds3: Dataset[User] = rdd.map {

case (id, name, age) => {

User(id, name, age)

}

}.toDS()

ds3.show()

+---+--------+---+

| id| name|age|

+---+--------+---+

| 1|zhangsan| 30|

| 2| lisi| 35|

+---+--------+---+

// TODO DataSet 转 RDD

val userRDD: RDD[User] = ds3.rdd

userRDD.collect().foreach(println)

User(1,zhangsan,30)

User(2,lisi,35)

// TODO 关闭环境

spark.close()

// main方法外的样例类

case class User(id: Int, name: String, age: Int)

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: