视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartition、sortBy、intersection、union、subtract、zip)】尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、sortByKey、join、leftOuterJoin、cogroup)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48

P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50

P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39

P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31

P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11

P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49

P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24

P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34

P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19

P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05

P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02

P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28

P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42

P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41

P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54

P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33

P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21

P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13

P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49

P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46

P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12

P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54

P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12

P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49

P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21

P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30

P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07

P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41

P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33

P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48

P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25

P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01

P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51

P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11

P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11

P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13

P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11

P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28

P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31

P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19

P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48

第5章 Spark核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

5.1 RDD

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

 

package com.atguigu.bigdata.spark.core.test

import java.io.OutputStream

import java.net.Socket

object Driver {

def main(args: Array[String]): Unit = {

//连接服务器

val client = new Socket("localhost", 9999)

val out: OutputStream = client.getOutputStream

out.write(2) //发送数据

out.flush()

out.close()

client.close()

}

}

package com.atguigu.bigdata.spark.core.test

import java.io.InputStream

import java.net.{ServerSocket, Socket}

object Executor {

def main(args: Array[String]): Unit = {

//启动服务器,接收数据

val server = new ServerSocket(9999)

println("服务器启动,等待接收数据...")

//等待客户端的连接

val client: Socket = server.accept()

val in: InputStream = client.getInputStream

val i: Int = in.read()

println("接收到客户端发送的数据:" + i)

in.close()

client.close()

server.close()

}

}

P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50

 

package com.atguigu.bigdata.spark.core.test

class Task extends Serializable { //最基本的计算任务

val datas = List(1, 2, 3, 4)

//val logic = (num: Int) => { num * 2 }

val logic: (Int) => Int = _ * 2

//计算

def compute() = {

datas.map(logic)

}

}

package com.atguigu.bigdata.spark.core.test

import java.io.{ObjectOutputStream, OutputStream}

import java.net.Socket

object Driver {

def main(args: Array[String]): Unit = {

//连接服务器

val client = new Socket("localhost", 9999)

val out: OutputStream = client.getOutputStream

val objOut = new ObjectOutputStream(out)

val task = new Task()

objOut.writeObject(task)

objOut.flush()

objOut.close()

client.close()

println("客户端数据发送完毕。")

}

}

package com.atguigu.bigdata.spark.core.test

import java.io.{InputStream, ObjectInputStream}

import java.net.{ServerSocket, Socket}

object Executor {

def main(args: Array[String]): Unit = {

//启动服务器,接收数据

val server = new ServerSocket(9999)

println("服务器启动,等待接收数据...")

//等待客户端的连接

val client: Socket = server.accept()

val in: InputStream = client.getInputStream

val objIn = new ObjectInputStream(in)

val task: Task = objIn.readObject().asInstanceOf[Task]

val ints: List[Int] = task.compute()

println("节点计算任务的节点为:" + ints)//计算节点计算的结果为

objIn.close()

client.close()

server.close()

}

}

P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39

Task与SubTask需要具有相同的逻辑。

   

 

package com.atguigu.bigdata.spark.core.test

class Task extends Serializable { //最基本的计算任务

val datas = List(1, 2, 3, 4)

//val logic = (num: Int) => { num * 2 }

val logic: (Int) => Int = _ * 2

//计算

def compute() = {

datas.map(logic)

}

}

package com.atguigu.bigdata.spark.core.test

class SubTask extends Serializable {

var datas: List[Int] = _

var logic: (Int) => Int = _

//计算

def compute() = {

datas.map(logic)

}

}

package com.atguigu.bigdata.spark.core.test

import java.io.{ObjectOutputStream, OutputStream}

import java.net.Socket

object Driver {

def main(args: Array[String]): Unit = {

//连接服务器

val client1 = new Socket("localhost", 9999)

val client2 = new Socket("localhost", 8888)

val task = new Task()

val out1: OutputStream = client1.getOutputStream

val objOut1 = new ObjectOutputStream(out1)

val subTask = new SubTask()

subTask.logic = task.logic

subTask.datas = task.datas.take(2)

objOut1.writeObject(subTask)

objOut1.flush()

objOut1.close()

client1.close()

val out2: OutputStream = client2.getOutputStream

val objOut2 = new ObjectOutputStream(out2)

val subTask1 = new SubTask()

subTask1.logic = task.logic

subTask1.datas = task.datas.takeRight(2)

objOut2.writeObject(subTask1)

objOut2.flush()

objOut2.close()

client2.close()

println("客户端数据发送完毕...")

}

}

package com.atguigu.bigdata.spark.core.test

import java.io.{InputStream, ObjectInputStream}

import java.net.{ServerSocket, Socket}

object Executor {

def main(args: Array[String]): Unit = {

//启动服务器,接收数据

val server = new ServerSocket(9999)

println("服务器启动,等待接收数据...")

//等待客户端的连接

val client: Socket = server.accept()

val in: InputStream = client.getInputStream

val objIn = new ObjectInputStream(in)

val task: SubTask = objIn.readObject().asInstanceOf[SubTask]

val ints: List[Int] = task.compute()

println("计算节点[9999]计算的结果为:" + ints)

objIn.close()

client.close()

server.close()

}

}

package com.atguigu.bigdata.spark.core.test

import java.io.{InputStream, ObjectInputStream}

import java.net.{ServerSocket, Socket}

object Executor2 {

def main(args: Array[String]): Unit = {

//启动服务器,接收数据

val server = new ServerSocket(8888)

println("服务器启动,等待接收数据...")

//等待客户端的连接

val client: Socket = server.accept()

val in: InputStream = client.getInputStream

val objIn = new ObjectInputStream(in)

val task: SubTask = objIn.readObject().asInstanceOf[SubTask]

val ints: List[Int] = task.compute()

println("计算节点[8888]计算的结果为:" + ints)

objIn.close()

client.close()

server.close()

}

}

P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31

5.1 RDD

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

画图工具:Balsamiq Mockups 3

P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11

 

P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49

P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24

 

P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

➢ 弹性

⚫ 存储的弹性:内存与磁盘的自动切换;⚫ 容错的弹性:数据丢失可以自动恢复;⚫ 计算的弹性:计算出错重试机制;⚫ 分片的弹性:可根据需要重新分片。➢ 分布式:数据存储在大数据集群不同节点上➢ 数据集:RDD封装了计算逻辑,并不保存数据➢ 数据抽象:RDD是一个抽象类,需要子类具体实现➢ 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑➢ 可分区、并行计算

P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19

5.1.2 核心属性

RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05

5.1.3 执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务;然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算;最后得到计算结果。

P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02

5.1.4 基础编程

5.1.4.1 RDD创建

在Spark中创建RDD的创建方式可以分为四种:

1) 从集合(内存)中创建 RDD2) 从外部存储(文件)创建 RDD3) 从其他 RDD 创建4) 直接创建 RDD(new)

ctrl+p:快捷键,提示函数参数列表。

 

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Memory {

def main(args: Array[String]): Unit = {

//TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //*表示当前系统的最大可用核数

val sc = new SparkContext(sparkConf)

//TODO 创建RDD

//从内存中创建RDD,将内存中集合的数据作为处理的数据源

val seq = Seq[Int](1, 2, 3, 4)

//parallelize:并行

//val rdd: RDD[Int] = sc.parallelize(seq)

//makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。

val rdd: RDD[Int] = sc.makeRDD(seq)

rdd.collect().foreach(println)

//TODO 关闭环境

sc.stop()

}

}

P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28

 

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File {

def main(args: Array[String]): Unit = {

//TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

val sc = new SparkContext(sparkConf)

//TODO 创建RDD

//从文件中创建RDD,将文件中的数据作为处理的数据源

//path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径

//sc.textFile("D:\\allCode\\JetBrains\\IdeaProjects\\atguigu-classes\\datas\\1.txt")

//val rdd: RDD[String] = sc.textFile("datas/1.txt")

//path路径可以是文件的具体路径,也可以目录名称

//val rdd = sc.textFile("datas")

//path路径还可以使用通配符 *

val rdd = sc.textFile("datas/1*.txt")

//path还可以是分布式存储系统路径:HDFS

//val rdd = sc.textFile("hdfs://node1:8020/test.txt")

rdd.collect().foreach(println)

//TODO 关闭环境

sc.stop()

}

}

P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File1 {

def main(args: Array[String]): Unit = {

//TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

val sc = new SparkContext(sparkConf)

//TODO 创建RDD

//从文件中创建RDD,将文件中的数据作为处理的数据源

//textFile:以行为单位来读取数据,读取的数据都是字符串

//wholeTextFiles:以文件为单位读取数据

//读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容

val rdd = sc.wholeTextFiles("datas")

rdd.collect().foreach(println)

//TODO 关闭环境

sc.stop()

}

}

P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41

5.1.4.2 RDD 并行度与分区

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Memory_Par {

def main(args: Array[String]): Unit = {

//TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

sparkConf.set("spark.default.parallelism", "5")//5个分区

val sc = new SparkContext(sparkConf)

//TODO 创建RDD

//RDD的并行度 & 分区

//makeRDD方法可以传递第二个参数,这个参数表示分区的数量

//第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)

// scheduler.conf.getInt("spark.default.parallelism", totalCores)

// spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism

// 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数

//val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

val rdd = sc.makeRDD(List(1, 2, 3, 4))

//将处理的数据保存成分区文件

rdd.saveAsTextFile("output")

//TODO 关闭环境

sc.stop()

}

}

P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Memory_Par1 {

def main(args: Array[String]): Unit = {

//TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

val sc = new SparkContext(sparkConf)

//TODO 创建RDD

//【1,2】,【3,4】

//val rdd = sc.makeRDD(List(1,2,3,4), 2)

//【1】,【2】,【3,4】

//val rdd = sc.makeRDD(List(1,2,3,4), 3)

//【1】,【2,3】,【4,5】

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)

//将处理的数据保存成分区文件

rdd.saveAsTextFile("output")

//TODO 关闭环境

sc.stop()

}

}

P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File_Par {

def main(args: Array[String]): Unit = {

// TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

val sc = new SparkContext(sparkConf)

// TODO 创建RDD

// textFile可以将文件作为数据处理的数据源,默认也可以设定分区。

// minPartitions : 最小分区数量

// math.min(defaultParallelism, 2)

//val rdd = sc.textFile("datas/1.txt")

//如果不想使用默认的分区数量,可以通过第二个参数指定分区数

//Spark读取文件,底层其实使用的就是Hadoop的读取方式

//分区数量的计算方式:

// totalSize = 7

// goalSize = 7 / 2 = 3(byte)

//7 / 3 = 2...1 (1.1) + 1 = 3(分区)

val rdd = sc.textFile("datas/1.txt", 2)

rdd.saveAsTextFile("output")

// TODO 关闭环境

sc.stop()

}

}

P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File_Par1 {

def main(args: Array[String]): Unit = {

//TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

val sc = new SparkContext(sparkConf)

//TODO 创建RDD

//TODO 数据分区的分配

//1. 数据以行为单位进行读取

// spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系

//2. 数据读取时以偏移量为单位,偏移量不会被重复读取

/*

偏移量

1@@ => 012

2@@ => 345

3 => 6

*/

//3. 数据分区的偏移量范围的计算

// 0 => [0, 3] => 12

// 1 => [3, 6] => 3

// 2 => [6, 7] =>

//【1,2】,【3】,【】

val rdd = sc.textFile("datas/1.txt", 2)

rdd.saveAsTextFile("output")

//TODO 关闭环境

sc.stop()

}

}

P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_File_Par2 {

def main(args: Array[String]): Unit = {

// TODO 准备环境

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

val sc = new SparkContext(sparkConf)

// TODO 创建RDD

// 14byte / 2 = 7byte

// 14 / 7 = 2(分区)

/*

1234567@@ => 012345678

89@@ => 9101112

0 => 13

[0, 7] => 1234567

[7, 14] => 890

*/

// 如果数据源为多个文件,那么计算分区时以文件为单位进行分区

val rdd = sc.textFile("datas/word.txt", 2)

rdd.saveAsTextFile("output003")

// TODO 关闭环境

sc.stop()

}

}

P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49

 

P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46

5.1.4.3 RDD转换算子

1) map

RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - map

val rdd = sc.makeRDD(List(1, 2, 3, 4))

// 1,2,3,4

// 2,4,6,8

//转换函数

def mapFunction(num: Int): Int = {

num * 2

}

//val mapRDD: RDD[Int] = rdd.map(mapFunction)

//val mapRDD: RDD[Int] = rdd.map((num: Int) => { num * 2 })

//val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)

//val mapRDD: RDD[Int] = rdd.map((num) => num * 2)

//val mapRDD: RDD[Int] = rdd.map(num => num * 2)

val mapRDD: RDD[Int] = rdd.map(_ * 2)

mapRDD.collect().foreach(println)

sc.stop()

}

}

P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12

小功能:从服务器日志数据apache.log中获取用户请求URL资源路径。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Test {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - map

val rdd = sc.textFile("datas/apache.log")

// 长的字符串

// 短的字符串

val mapRDD: RDD[String] = rdd.map(

line => {

val datas = line.split(" ")

datas(6)

}

)

mapRDD.collect().foreach(println)

sc.stop()

}

}

P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Par {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - map

// 1. rdd的计算一个分区内的数据是一个一个地执行逻辑

// 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。

// 分区内数据的执行是有序的。

// 2. 不同分区数据计算是无序的。

val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)//2个分区

val mapRDD = rdd.map(

num => {

println(">>>>>>>> " + num)

num

}

)

val mapRDD1 = mapRDD.map(

num => {

println("######" + num)

num

}

)

mapRDD1.collect()

sc.stop()

}

}

P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12

2) mapPartitions

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - mapPartitions

val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

// mapPartitions:可以以分区为单位进行数据转换操作

// 但是会将整个分区的数据加载到内存进行引用

// 如果处理完的数据是不会被释放掉,存在对象的引用。

// 在内存较小,数据量较大的场合下,容易出现内存溢出。

val mpRDD: RDD[Int] = rdd.mapPartitions(

iter => {

println(">>>>>>>>>>")

iter.map(_ * 2)

}

)

mpRDD.collect().foreach(println)

sc.stop()

}

}

P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform_Test {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - mapPartitions

val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

// 【1,2】,【3,4】

// 【2】,【4】

val mpRDD = rdd.mapPartitions(

iter => {

List(iter.max).iterator

}

)

mpRDD.collect().foreach(println)

sc.stop()

}

}

P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21

思考一个问题:map和mapPartitions的区别?

数据处理角度

Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。功能的角度

Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据。性能的角度

Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。

完成比完美更重要。

P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30

3) mapPartitionsWithIndex

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Transform1 {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - mapPartitions

val rdd = sc.makeRDD(List(1, 2, 3, 4))

val mpiRDD = rdd.mapPartitionsWithIndex(

(index, iter) => {

// 1, 2, 3, 4

//(0,1)(2,2),(4,3),(6,4)

iter.map(

num => {

(index, num)

}

)

}

)

mpiRDD.collect().foreach(println)

sc.stop()

}

}

P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07

4) flatMap

 

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - flatMap

val rdd: RDD[List[Int]] = sc.makeRDD(List(

List(1, 2), List(3, 4)

))

val flatRDD: RDD[Int] = rdd.flatMap(

list => {

list

}

)

flatRDD.collect().foreach(println)

sc.stop()

}

}

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform1 {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - flatMap

val rdd: RDD[String] = sc.makeRDD(List(

"Hello Scala", "Hello Spark"

))

val flatRDD: RDD[String] = rdd.flatMap(

s => {

s.split(" ")

}

)

flatRDD.collect().foreach(println)

sc.stop()

}

}

P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41

小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform2 {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - flatMap

val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))

val flatRDD = rdd.flatMap(

data => {

data match {//模式匹配

case list: List[_] => list

case dat => List(dat)

}

}

)

flatRDD.collect().foreach(println)

sc.stop()

}

}

P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33

5) glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Transform_Test {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - glom

val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

// 【1,2】,【3,4】

// 【2】,【4】

// 【6】

val glomRDD: RDD[Array[Int]] = rdd.glom()

val maxRDD: RDD[Int] = glomRDD.map(

array => {

array.max

}

)

println(maxRDD.collect().sum)

sc.stop()

}

}

P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Part {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - map

val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

// 【1,2】,【3,4】

rdd.saveAsTextFile("output1")

val mapRDD = rdd.map(_ * 2)

// 【2,4】,【6,8】

mapRDD.saveAsTextFile("output2")

sc.stop()

}

}

P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25

6) groupBy

小功能:将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - groupBy

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组

// 相同的key值的数据会放置在一个组中

def groupFunction(num: Int) = {

num % 2

}

val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)

groupRDD.collect().foreach(println)

sc.stop()

}

}

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform1 {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - groupBy

val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)

// 分组和分区没有必然的关系

val groupRDD = rdd.groupBy(_.charAt(0))

groupRDD.collect().foreach(println)

sc.stop()

}

}

P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01

 

P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51

6) groupBy

小功能:从服务器日志数据apache.log中获取每个时间段访问量。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import java.text.SimpleDateFormat

import java.util.Date

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform_Test {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - groupBy

val rdd = sc.textFile("datas/apache.log")

val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(

line => {

val datas = line.split(" ")

val time = datas(3)

//time.substring(0, )

val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

val date: Date = sdf.parse(time)

val sdf1 = new SimpleDateFormat("HH")

val hour: String = sdf1.format(date)

(hour, 1)

}

).groupBy(_._1)

timeRDD.map {

case (hour, iter) => {

(hour, iter.size)

}

}.collect.foreach(println)

sc.stop()

}

}

P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11

7) filter

小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径。

 

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import java.text.SimpleDateFormat

import java.util.Date

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - filter

val rdd = sc.makeRDD(List(1, 2, 3, 4))

val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)

filterRDD.collect().foreach(println)

sc.stop()

}

}

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform_Test {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - filter

val rdd = sc.textFile("datas/apache.log")

rdd.filter(

line => {

val datas = line.split(" ")

val time = datas(3)

time.startsWith("17/05/2015")

}

).collect().foreach(println)

sc.stop()

}

}

P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11

8) sample

思考一个问题:有啥用,抽奖吗?使用场景:数据倾斜,分区:均衡、shuffle。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark08_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - sample

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

// sample算子需要传递三个参数

// 1. 第一个参数表示,抽取数据后是否将数据返回,true(放回)、false(丢弃)

// 2. 第二个参数表示,

// 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念

// 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数

// 3. 第三个参数表示,抽取数据时随机算法的种子

// 如果不传递第三个参数,那么使用的是当前系统时间

// println(rdd.sample(

// false,

// 0.4,

// 1

// ).collect().mkString(",")

// )

println(rdd.sample(

true,

2,

1

).collect().mkString(","))

sc.stop()

}

}

P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13

9) distinct

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark09_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - distinct

val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))

// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

// (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)

// (1, null)(1, null)(1, null)

// (null, null) => null

// (1, null) => 1

val rdd1: RDD[Int] = rdd.distinct()

rdd1.collect().foreach(println)

sc.stop()

}

}

P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11

10) coalesce

 

 

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark10_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - coalesce

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

// coalesce方法默认情况下不会将分区的数据打乱重新组合

// 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜

// 如果想要让数据均衡,可以进行shuffle处理

// val newRDD: RDD[Int] = rdd.coalesce(2)

val newRDD: RDD[Int] = rdd.coalesce(2, true)

newRDD.saveAsTextFile("output004")

sc.stop()

}

}

P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28

11) repartition

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark11_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - repartition

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)

// coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。

// 所以如果想要实现扩大分区的效果,需要使用shuffle操作

// spark提供了一个简化的操作

// 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle

// 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle

// val newRDD: RDD[Int] = rdd.coalesce(3, true)

val newRDD: RDD[Int] = rdd.repartition(3)

newRDD.saveAsTextFile("output005")

sc.stop()

}

}

P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31

12) sortBy

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark12_RDD_Operator_Transform1 {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - sortBy

val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)

// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式

// sortBy默认情况下,不会改变分区,但是中间存在shuffle操作。

val newRDD = rdd.sortBy(t => t._1.toInt, false)

newRDD.collect().foreach(println)

sc.stop()

}

}

P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19

13) intersection

14) union

15) subtract

16) zip

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - 双Value类型

// 交集,并集和差集要求两个数据源数据类型保持一致

// 拉链操作两个数据源的类型可以不一致

val rdd1 = sc.makeRDD(List(1, 2, 3, 4))

val rdd2 = sc.makeRDD(List(3, 4, 5, 6))

// 交集 : 【3,4】

val rdd3: RDD[Int] = rdd1.intersection(rdd2)

println(rdd3.collect().mkString(","))

// 并集 : 【1,2,3,4,3,4,5,6】

val rdd4: RDD[Int] = rdd1.union(rdd2)

println(rdd4.collect().mkString(","))

// 差集 : 【1,2】

val rdd5: RDD[Int] = rdd1.subtract(rdd2)

println(rdd5.collect().mkString(","))

// 拉链 : 【1-3,2-4,3-5,4-6】

val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)

println(rdd6.collect().mkString(","))

sc.stop()

}

}

P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10

13) intersection

14) union

15) subtract

16) zip

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - 双Value类型

// 交集,并集和差集要求两个数据源数据类型保持一致

// 拉链操作两个数据源的类型可以不一致

val rdd1 = sc.makeRDD(List(1, 2, 3, 4))

val rdd2 = sc.makeRDD(List(3, 4, 5, 6))

val rdd7 = sc.makeRDD(List("3", "4", "5", "6"))

// 交集 : 【3,4】

val rdd3: RDD[Int] = rdd1.intersection(rdd2)

//val rdd8 = rdd1.intersection(rdd7)

println(rdd3.collect().mkString(","))

// 并集 : 【1,2,3,4,3,4,5,6】

val rdd4: RDD[Int] = rdd1.union(rdd2)

println(rdd4.collect().mkString(","))

// 差集 : 【1,2】

val rdd5: RDD[Int] = rdd1.subtract(rdd2)

println(rdd5.collect().mkString(","))

// 拉链 : 【1-3,2-4,3-5,4-6】

val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)

val rdd8 = rdd1.zip(rdd7)

println(rdd6.collect().mkString(","))

sc.stop()

}

}

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform1 {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")

val sc = new SparkContext(sparkConf)

// TODO 算子 - 双Value类型

// Can't zip RDDs with unequal numbers of partitions: List(2, 4)

// 两个数据源要求分区数量要保持一致

// Can only zip RDDs with same number of elements in each partition

// 两个数据源要求分区中数据数量保持一致

val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)

val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)

val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)

println(rdd6.collect().mkString(","))

sc.stop()

}

}

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: