视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili
尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartition、sortBy、intersection、union、subtract、zip)】尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、sortByKey、join、leftOuterJoin、cogroup)】
目录
01_尚硅谷大数据技术之SparkCore
第05章-Spark核心编程
P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48
P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50
P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39
P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31
P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11
P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49
P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24
P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34
P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19
P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05
P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02
P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28
P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42
P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41
P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54
P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33
P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21
P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13
P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49
P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46
P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12
P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54
P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12
P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49
P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21
P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30
P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07
P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41
P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33
P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48
P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25
P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01
P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51
P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11
P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11
P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13
P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11
P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28
P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31
P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19
P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10
01_尚硅谷大数据技术之SparkCore
第05章-Spark核心编程
P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48
第5章 Spark核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量
5.1 RDD
5.1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
package com.atguigu.bigdata.spark.core.test
import java.io.OutputStream
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client = new Socket("localhost", 9999)
val out: OutputStream = client.getOutputStream
out.write(2) //发送数据
out.flush()
out.close()
client.close()
}
}
package com.atguigu.bigdata.spark.core.test
import java.io.InputStream
import java.net.{ServerSocket, Socket}
object Executor {
def main(args: Array[String]): Unit = {
//启动服务器,接收数据
val server = new ServerSocket(9999)
println("服务器启动,等待接收数据...")
//等待客户端的连接
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val i: Int = in.read()
println("接收到客户端发送的数据:" + i)
in.close()
client.close()
server.close()
}
}
P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50
package com.atguigu.bigdata.spark.core.test
class Task extends Serializable { //最基本的计算任务
val datas = List(1, 2, 3, 4)
//val logic = (num: Int) => { num * 2 }
val logic: (Int) => Int = _ * 2
//计算
def compute() = {
datas.map(logic)
}
}
package com.atguigu.bigdata.spark.core.test
import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client = new Socket("localhost", 9999)
val out: OutputStream = client.getOutputStream
val objOut = new ObjectOutputStream(out)
val task = new Task()
objOut.writeObject(task)
objOut.flush()
objOut.close()
client.close()
println("客户端数据发送完毕。")
}
}
package com.atguigu.bigdata.spark.core.test
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor {
def main(args: Array[String]): Unit = {
//启动服务器,接收数据
val server = new ServerSocket(9999)
println("服务器启动,等待接收数据...")
//等待客户端的连接
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val objIn = new ObjectInputStream(in)
val task: Task = objIn.readObject().asInstanceOf[Task]
val ints: List[Int] = task.compute()
println("节点计算任务的节点为:" + ints)//计算节点计算的结果为
objIn.close()
client.close()
server.close()
}
}
P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39
Task与SubTask需要具有相同的逻辑。
package com.atguigu.bigdata.spark.core.test
class Task extends Serializable { //最基本的计算任务
val datas = List(1, 2, 3, 4)
//val logic = (num: Int) => { num * 2 }
val logic: (Int) => Int = _ * 2
//计算
def compute() = {
datas.map(logic)
}
}
package com.atguigu.bigdata.spark.core.test
class SubTask extends Serializable {
var datas: List[Int] = _
var logic: (Int) => Int = _
//计算
def compute() = {
datas.map(logic)
}
}
package com.atguigu.bigdata.spark.core.test
import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client1 = new Socket("localhost", 9999)
val client2 = new Socket("localhost", 8888)
val task = new Task()
val out1: OutputStream = client1.getOutputStream
val objOut1 = new ObjectOutputStream(out1)
val subTask = new SubTask()
subTask.logic = task.logic
subTask.datas = task.datas.take(2)
objOut1.writeObject(subTask)
objOut1.flush()
objOut1.close()
client1.close()
val out2: OutputStream = client2.getOutputStream
val objOut2 = new ObjectOutputStream(out2)
val subTask1 = new SubTask()
subTask1.logic = task.logic
subTask1.datas = task.datas.takeRight(2)
objOut2.writeObject(subTask1)
objOut2.flush()
objOut2.close()
client2.close()
println("客户端数据发送完毕...")
}
}
package com.atguigu.bigdata.spark.core.test
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor {
def main(args: Array[String]): Unit = {
//启动服务器,接收数据
val server = new ServerSocket(9999)
println("服务器启动,等待接收数据...")
//等待客户端的连接
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val objIn = new ObjectInputStream(in)
val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
val ints: List[Int] = task.compute()
println("计算节点[9999]计算的结果为:" + ints)
objIn.close()
client.close()
server.close()
}
}
package com.atguigu.bigdata.spark.core.test
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor2 {
def main(args: Array[String]): Unit = {
//启动服务器,接收数据
val server = new ServerSocket(8888)
println("服务器启动,等待接收数据...")
//等待客户端的连接
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val objIn = new ObjectInputStream(in)
val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
val ints: List[Int] = task.compute()
println("计算节点[8888]计算的结果为:" + ints)
objIn.close()
client.close()
server.close()
}
}
P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31
5.1 RDD
5.1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
画图工具:Balsamiq Mockups 3
P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11
P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49
P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24
P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34
5.1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
➢ 弹性
⚫ 存储的弹性:内存与磁盘的自动切换;⚫ 容错的弹性:数据丢失可以自动恢复;⚫ 计算的弹性:计算出错重试机制;⚫ 分片的弹性:可根据需要重新分片。➢ 分布式:数据存储在大数据集群不同节点上➢ 数据集:RDD封装了计算逻辑,并不保存数据➢ 数据抽象:RDD是一个抽象类,需要子类具体实现➢ 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑➢ 可分区、并行计算
P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19
5.1.2 核心属性
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05
5.1.3 执行原理
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务;然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算;最后得到计算结果。
P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02
5.1.4 基础编程
5.1.4.1 RDD创建
在Spark中创建RDD的创建方式可以分为四种:
1) 从集合(内存)中创建 RDD2) 从外部存储(文件)创建 RDD3) 从其他 RDD 创建4) 直接创建 RDD(new)
ctrl+p:快捷键,提示函数参数列表。
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Memory {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //*表示当前系统的最大可用核数
val sc = new SparkContext(sparkConf)
//TODO 创建RDD
//从内存中创建RDD,将内存中集合的数据作为处理的数据源
val seq = Seq[Int](1, 2, 3, 4)
//parallelize:并行
//val rdd: RDD[Int] = sc.parallelize(seq)
//makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
val rdd: RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_File {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
//TODO 创建RDD
//从文件中创建RDD,将文件中的数据作为处理的数据源
//path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径
//sc.textFile("D:\\allCode\\JetBrains\\IdeaProjects\\atguigu-classes\\datas\\1.txt")
//val rdd: RDD[String] = sc.textFile("datas/1.txt")
//path路径可以是文件的具体路径,也可以目录名称
//val rdd = sc.textFile("datas")
//path路径还可以使用通配符 *
val rdd = sc.textFile("datas/1*.txt")
//path还可以是分布式存储系统路径:HDFS
//val rdd = sc.textFile("hdfs://node1:8020/test.txt")
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_File1 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
//TODO 创建RDD
//从文件中创建RDD,将文件中的数据作为处理的数据源
//textFile:以行为单位来读取数据,读取的数据都是字符串
//wholeTextFiles:以文件为单位读取数据
//读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
val rdd = sc.wholeTextFiles("datas")
rdd.collect().foreach(println)
//TODO 关闭环境
sc.stop()
}
}
P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41
5.1.4.2 RDD 并行度与分区
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Memory_Par {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
sparkConf.set("spark.default.parallelism", "5")//5个分区
val sc = new SparkContext(sparkConf)
//TODO 创建RDD
//RDD的并行度 & 分区
//makeRDD方法可以传递第二个参数,这个参数表示分区的数量
//第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)
// scheduler.conf.getInt("spark.default.parallelism", totalCores)
// spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
// 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
//val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
//将处理的数据保存成分区文件
rdd.saveAsTextFile("output")
//TODO 关闭环境
sc.stop()
}
}
P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Memory_Par1 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
//TODO 创建RDD
//【1,2】,【3,4】
//val rdd = sc.makeRDD(List(1,2,3,4), 2)
//【1】,【2】,【3,4】
//val rdd = sc.makeRDD(List(1,2,3,4), 3)
//【1】,【2,3】,【4,5】
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
//将处理的数据保存成分区文件
rdd.saveAsTextFile("output")
//TODO 关闭环境
sc.stop()
}
}
P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_File_Par {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// TODO 创建RDD
// textFile可以将文件作为数据处理的数据源,默认也可以设定分区。
// minPartitions : 最小分区数量
// math.min(defaultParallelism, 2)
//val rdd = sc.textFile("datas/1.txt")
//如果不想使用默认的分区数量,可以通过第二个参数指定分区数
//Spark读取文件,底层其实使用的就是Hadoop的读取方式
//分区数量的计算方式:
// totalSize = 7
// goalSize = 7 / 2 = 3(byte)
//7 / 3 = 2...1 (1.1) + 1 = 3(分区)
val rdd = sc.textFile("datas/1.txt", 2)
rdd.saveAsTextFile("output")
// TODO 关闭环境
sc.stop()
}
}
P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_File_Par1 {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
//TODO 创建RDD
//TODO 数据分区的分配
//1. 数据以行为单位进行读取
// spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系
//2. 数据读取时以偏移量为单位,偏移量不会被重复读取
/*
偏移量
1@@ => 012
2@@ => 345
3 => 6
*/
//3. 数据分区的偏移量范围的计算
// 0 => [0, 3] => 12
// 1 => [3, 6] => 3
// 2 => [6, 7] =>
//【1,2】,【3】,【】
val rdd = sc.textFile("datas/1.txt", 2)
rdd.saveAsTextFile("output")
//TODO 关闭环境
sc.stop()
}
}
P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13
package com.atguigu.bigdata.spark.core.rdd.builder
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_File_Par2 {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// TODO 创建RDD
// 14byte / 2 = 7byte
// 14 / 7 = 2(分区)
/*
1234567@@ => 012345678
89@@ => 9101112
0 => 13
[0, 7] => 1234567
[7, 14] => 890
*/
// 如果数据源为多个文件,那么计算分区时以文件为单位进行分区
val rdd = sc.textFile("datas/word.txt", 2)
rdd.saveAsTextFile("output003")
// TODO 关闭环境
sc.stop()
}
}
P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49
P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46
5.1.4.3 RDD转换算子
1) map
RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - map
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// 1,2,3,4
// 2,4,6,8
//转换函数
def mapFunction(num: Int): Int = {
num * 2
}
//val mapRDD: RDD[Int] = rdd.map(mapFunction)
//val mapRDD: RDD[Int] = rdd.map((num: Int) => { num * 2 })
//val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)
//val mapRDD: RDD[Int] = rdd.map((num) => num * 2)
//val mapRDD: RDD[Int] = rdd.map(num => num * 2)
val mapRDD: RDD[Int] = rdd.map(_ * 2)
mapRDD.collect().foreach(println)
sc.stop()
}
}
P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12
小功能:从服务器日志数据apache.log中获取用户请求URL资源路径。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - map
val rdd = sc.textFile("datas/apache.log")
// 长的字符串
// 短的字符串
val mapRDD: RDD[String] = rdd.map(
line => {
val datas = line.split(" ")
datas(6)
}
)
mapRDD.collect().foreach(println)
sc.stop()
}
}
P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Par {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - map
// 1. rdd的计算一个分区内的数据是一个一个地执行逻辑
// 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
// 分区内数据的执行是有序的。
// 2. 不同分区数据计算是无序的。
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)//2个分区
val mapRDD = rdd.map(
num => {
println(">>>>>>>> " + num)
num
}
)
val mapRDD1 = mapRDD.map(
num => {
println("######" + num)
num
}
)
mapRDD1.collect()
sc.stop()
}
}
P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12
2) mapPartitions
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - mapPartitions
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// mapPartitions:可以以分区为单位进行数据转换操作
// 但是会将整个分区的数据加载到内存进行引用
// 如果处理完的数据是不会被释放掉,存在对象的引用。
// 在内存较小,数据量较大的场合下,容易出现内存溢出。
val mpRDD: RDD[Int] = rdd.mapPartitions(
iter => {
println(">>>>>>>>>>")
iter.map(_ * 2)
}
)
mpRDD.collect().foreach(println)
sc.stop()
}
}
P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - mapPartitions
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// 【1,2】,【3,4】
// 【2】,【4】
val mpRDD = rdd.mapPartitions(
iter => {
List(iter.max).iterator
}
)
mpRDD.collect().foreach(println)
sc.stop()
}
}
P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21
思考一个问题:map和mapPartitions的区别?
数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据。性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。
完成比完美更重要。
P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30
3) mapPartitionsWithIndex
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - mapPartitions
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
// 1, 2, 3, 4
//(0,1)(2,2),(4,3),(6,4)
iter.map(
num => {
(index, num)
}
)
}
)
mpiRDD.collect().foreach(println)
sc.stop()
}
}
P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07
4) flatMap
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - flatMap
val rdd: RDD[List[Int]] = sc.makeRDD(List(
List(1, 2), List(3, 4)
))
val flatRDD: RDD[Int] = rdd.flatMap(
list => {
list
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - flatMap
val rdd: RDD[String] = sc.makeRDD(List(
"Hello Scala", "Hello Spark"
))
val flatRDD: RDD[String] = rdd.flatMap(
s => {
s.split(" ")
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41
小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - flatMap
val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
val flatRDD = rdd.flatMap(
data => {
data match {//模式匹配
case list: List[_] => list
case dat => List(dat)
}
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}
P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33
5) glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - glom
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
// 【2】,【4】
// 【6】
val glomRDD: RDD[Array[Int]] = rdd.glom()
val maxRDD: RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum)
sc.stop()
}
}
P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Transform_Part {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - map
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// 【1,2】,【3,4】
rdd.saveAsTextFile("output1")
val mapRDD = rdd.map(_ * 2)
// 【2,4】,【6,8】
mapRDD.saveAsTextFile("output2")
sc.stop()
}
}
P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25
6) groupBy
小功能:将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - groupBy
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
// 相同的key值的数据会放置在一个组中
def groupFunction(num: Int) = {
num % 2
}
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - groupBy
val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
// 分组和分区没有必然的关系
val groupRDD = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
sc.stop()
}
}
P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01
P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51
6) groupBy
小功能:从服务器日志数据apache.log中获取每个时间段访问量。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - groupBy
val rdd = sc.textFile("datas/apache.log")
val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
line => {
val datas = line.split(" ")
val time = datas(3)
//time.substring(0, )
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val date: Date = sdf.parse(time)
val sdf1 = new SimpleDateFormat("HH")
val hour: String = sdf1.format(date)
(hour, 1)
}
).groupBy(_._1)
timeRDD.map {
case (hour, iter) => {
(hour, iter.size)
}
}.collect.foreach(println)
sc.stop()
}
}
P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11
7) filter
小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - filter
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
filterRDD.collect().foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Transform_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - filter
val rdd = sc.textFile("datas/apache.log")
rdd.filter(
line => {
val datas = line.split(" ")
val time = datas(3)
time.startsWith("17/05/2015")
}
).collect().foreach(println)
sc.stop()
}
}
P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11
8) sample
思考一个问题:有啥用,抽奖吗?使用场景:数据倾斜,分区:均衡、shuffle。
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.{SparkConf, SparkContext}
object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - sample
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// sample算子需要传递三个参数
// 1. 第一个参数表示,抽取数据后是否将数据返回,true(放回)、false(丢弃)
// 2. 第二个参数表示,
// 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
// 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
// 3. 第三个参数表示,抽取数据时随机算法的种子
// 如果不传递第三个参数,那么使用的是当前系统时间
// println(rdd.sample(
// false,
// 0.4,
// 1
// ).collect().mkString(",")
// )
println(rdd.sample(
true,
2,
1
).collect().mkString(","))
sc.stop()
}
}
P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13
9) distinct
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark09_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - distinct
val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
// (1, null)(1, null)(1, null)
// (null, null) => null
// (1, null) => 1
val rdd1: RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
sc.stop()
}
}
P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11
10) coalesce
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - coalesce
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
// coalesce方法默认情况下不会将分区的数据打乱重新组合
// 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
// 如果想要让数据均衡,可以进行shuffle处理
// val newRDD: RDD[Int] = rdd.coalesce(2)
val newRDD: RDD[Int] = rdd.coalesce(2, true)
newRDD.saveAsTextFile("output004")
sc.stop()
}
}
P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28
11) repartition
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark11_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - repartition
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
// coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
// 所以如果想要实现扩大分区的效果,需要使用shuffle操作
// spark提供了一个简化的操作
// 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
// 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
// val newRDD: RDD[Int] = rdd.coalesce(3, true)
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("output005")
sc.stop()
}
}
P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31
12) sortBy
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark12_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - sortBy
val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
// sortBy默认情况下,不会改变分区,但是中间存在shuffle操作。
val newRDD = rdd.sortBy(t => t._1.toInt, false)
newRDD.collect().foreach(println)
sc.stop()
}
}
P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19
13) intersection
14) union
15) subtract
16) zip
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
// 交集 : 【3,4】
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10
13) intersection
14) union
15) subtract
16) zip
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
val rdd7 = sc.makeRDD(List("3", "4", "5", "6"))
// 交集 : 【3,4】
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark13_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// Can't zip RDDs with unequal numbers of partitions: List(2, 4)
// 两个数据源要求分区数量要保持一致
// Can only zip RDDs with same number of elements in each partition
// 两个数据源要求分区中数据数量保持一致
val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
参考文章
发表评论