一、RDD概念

RDD(英文全称Resilient Distributed Dataset),即弹性分布式数据集是spark中引入的一个数据结构,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

Resilient弹性:RDD的数据可以存储在内存或者磁盘当中,RDD的数据可以分区。

Distributed分布式:RDD的数据可以分布式存储,可以进行并行计算。

Dataset数据集:一个用于存放数据的集合。

二、RDD算子

        指的是RDD对象中提供了非常多的具有特殊功能的函数, 我们将这些函数称为算子(函数/方法/API)。

RDD算子分为两类:

        Transformation(转换算子):              返回值: 是一个新的RDD             特点: 转换算子只是定义数据的处理规则,并不会立即执行,是lazy(惰性)的。需要由Action算子触发              Action(动作算子):             返回值: 要么没有返回值None,或者返回非RDD类型的数据             特点: 动作算子都是立即执行。执行的时候,会将它上游的其他算子一同触发执行

        以下演示Spark学习用到的相关RDD算子,通过SecureCRTPortable客户端远程连接Linux服务器操作pyspark。

        连接界面如下:

三、RDD的转换算子

(一)(单)值类型算子

1、map算子

        格式:rdd.map(fn)         作用:  主要根据传入的函数,对数据进行一对一的转换操作,传入一行,返回一行。

        需求: 数字加一后返回

        代码:

init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])

init_rdd.map(lambda num:num+1).collect()

#运行结果:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

2、groupBy 算子

        格式: groupBy(fn)         作用: 根据用户传入的自定义函数,对数据进行分组操作

        需求: 将数据分成奇数和偶数

        代码:

>>> init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])

>>> init_rdd.groupBy(lambda num:"偶数" if num % 2 == 0 else "奇数").mapValues(list).collect()

# 运行结果:[('偶数', [0, 2, 4, 6, 8]), ('奇数', [1, 3, 5, 7, 9])]

        说明:mapValues(list)将数据类型转成List列表

3、filter算子

        格式:filter(fn)         作用:根据用户传入的自定义函数对数据进行过滤操作。自定义函数的返回值类型是bool类型。True表示满足过滤条件,会将数据保留下来;False会将数据丢弃掉。

        需求:过滤掉数值<=3的数据

        代码:

>>> init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])

>>> init_rdd.filter(lambda num:num > 3).collect()

# 运行结果 [4, 5, 6, 7, 8, 9]

4、flatMap算子

        格式:rdd.flatMap(fn)

        作用:在map算子的基础上,加入一个压扁的操作, 主要适用于一行中包含多个内容的操作,实现一转多的操作

        需求:将姓名一个一个的输出

        代码:

>>> init_rdd = sc.parallelize(['张三 李四 王五','赵六 周日'])

>>> init_rdd.flatMap(lambda line:line.split()).collect()

# 运行结果 ['张三', '李四', '王五', '赵六', '周日']

        说明: split()默认会按照空白字符对内容进行切分处理。例如:空格、制表符、回车。还是推荐大家明确指定所需要分割的符号。

(二)双值类型算子

        双值类型算子主要有union(并集) 和intersection(交集)

        格式:rdd1.union(rdd2)、 rdd1.intersection(rdd2)

        代码:

>>> rdd1 = sc.parallelize([3,3,2,6,8,0])

>>> rdd2 = sc.parallelize([3,2,1,5,7])

>>> rdd1.union(rdd2).collect()

# 并集运行结果 [3, 3, 2, 6, 8, 0, 3, 2, 1, 5, 7]

>>>

>>> rdd1.union(rdd2).distinct().collect()

# 并集去重运行结果 [8, 0, 1, 5, 2, 6, 3, 7]

>>>

>>> rdd1.intersection(rdd2).collect()

# 交集运行结果 [2, 3]

        说明:union取并集不会对重复出现的数据去,distinct()是转换算子,用来对RDD中的元素进行去重处理。交集会对结果数据进行去重处理。

(三)key-value数据类型算子

1、groupByKey()

        格式: rdd.groupByKey()

        作用: 对键值对类型的RDD中的元素按照键key进行分组操作。只会进行分组。

        需求:对学生按照班级分组统计

        代码:

>>> rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九')])

>>> rdd.groupByKey().mapValues(list).collect()

# 运行结果 [('c01', ['张三', '赵六']), ('c02', ['李四', '王五', '李九']), ('c03', ['田七', '周八'])]

2、reduceByKey()

        格式: rdd.reduceByKey(fn)         作用: 根据key进行分组,将一个组内的value数据放置到一个列表中,对这个列表基于fn进行聚合计算操作

        需求:统计每个班级学生人数

        代码:

>>> rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九')])

>>> rdd.map(lambda tup:(tup[0],1)).reduceByKey(lambda agg,curr:agg+curr).collect()

# 运行结果[('c01', 2), ('c02', 3), ('c03', 2)]

3、sortByKey()算子

        格式:rdd.sortByKey(ascending=True|False)         作用:  根据key进行排序操作,默认按照key进行升序排序,如果需要降序,设置 ascending 参数的值为False。

        需求1:根据key进行排序操作,演示升序

        代码:

>>> rdd = sc.parallelize([(10,2),(15,3),(8,4),(7,4),(2,4),(12,4)])

>>> rdd.sortByKey().collect()

# 运行结果[(2, 4), (7, 4), (8, 4), (10, 2), (12, 4), (15, 3)]

        需求2:根据key进行排序操作,演示降序

        代码:

>>> rdd = sc.parallelize([(10,2),(15,3),(8,4),(7,4),(2,4),(12,4)])

>>> rdd.sortByKey(ascending=False).collect()

# 运行结果 [(15, 3), (12, 4), (10, 2), (8, 4), (7, 4), (2, 4)]

        需求3:根据key进行排序操作,演示升序

        代码:根据key进行排序操作,演示升序

>>> rdd = sc.parallelize([('a01',2),('A01',3),('a011',2),('a03',2),('a021',2),('a04',2)])>>> rdd.sortByKey().collect()

# 运行结果 [('A01', 3), ('a01', 2), ('a011', 2), ('a021', 2), ('a03', 2), ('a04', 2)]

        说明:对字符串类型的key进行排序的时候,按照ASCII码表进行排序。大写字母排在小写字母的前面;如果前缀一样,短的排在前面,长的排在后面。

四、RDD的动作算子

1、collect() 算子:

        格式: collect()

        作用: 收集各个分区的数据,将数据汇总到一个大的列表返回

        使用:在需要执行的转换算子最后加上collect()

2、reduce() 算子

        格式: reduce(fn)         作用: 根据用户传入的自定义函数,对数据进行聚合操作。该算子是Action动作算子;而reduceByKey是Transformation转换算子。

        需求:统计所有元素之和是多少

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

>>> def mysum(agg,curr):

... print(f"中间临时聚合结果{agg},当前遍历到的元素{curr}")

... return agg+curr

...

>>> rdd.reduce(mysum)

###

运行结果

中间临时聚合结果6,当前遍历到的元素7

中间临时聚合结果13,当前遍历到的元素8

中间临时聚合结果21,当前遍历到的元素9

中间临时聚合结果30,当前遍历到的元素10

中间临时聚合结果1,当前遍历到的元素2

中间临时聚合结果3,当前遍历到的元素3

中间临时聚合结果6,当前遍历到的元素4

中间临时聚合结果10,当前遍历到的元素5

中间临时聚合结果15,当前遍历到的元素40

55

###

        说明: 初始化的时候,agg,表示中间临时聚合结果,默认取列表中的第一个元素值,curr表示当前遍历到的元素,默认取列表中的第二个元素的值。

3、first() 算子

        格式: rdd.first()         作用: 取RDD中的第一个元素。(不会对RDD中的数据排序)

        需求:

        代码:

>>> rdd = sc.parallelize([3,1,2,4,5,6,7,8,9,10])

>>> rdd.first()

# 运行结果 3

4、take() 算子

        格式: rdd.take(N)         说明: 取RDD中的前N元素。(不会对RDD中的数据排序)

        需求:获取前3个元素

        代码:

>>> rdd = sc.parallelize([3,1,2,4,5,6,7,8,9,10])

>>> rdd.take(3)

# 运行结果 [3, 1, 2]

5、top()算子

        格式: top(N,[fn])         作用: 对数据集进行倒序排序操作,如果kv(键值对)类型,针对key进行排序,获取前N个元素         fn: 可以自定义排序,按照谁来排序

        需求1:获取前3个元素

        代码:

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

>>> rdd.top(3)

# 运行结果 [10, 9, 8]

        需求2:按照班级人数降序排序,取前2个

        代码:

>>> rdd = sc.parallelize([('c01',5),('c02',8),('c04',1),('c03',4)])

>>> rdd.top(2,key=lambda tup:tup[1])

#运行结果 [('c02', 8), ('c01', 5)]

        需求3:按照班级人数升序排序,取前2个

>>> rdd = sc.parallelize([('c01',5),('c02',8),('c04',1),('c03',4)])

>>> rdd.top(2,key= lambda tup:-tup[1])

# 运行结果 [('c04', 1), ('c03', 4)]

6、count() 算子

        格式:count()         作用:统计RDD中一共有多少个元素

        需求:获取一共有多少个元素

        代码:

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

>>> rdd.count()

# 运行结果 10

7、foreach() 算子

        格式: foreach(fn)         作用: 遍历RDD中的元素,对元素根据传入的函数进行自定义的处理

        需求:对数据进行遍历打印

        代码:

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

>>> rdd.foreach(lambda num:print(num))

###

运行结果

6

7

8

9

10

1

2

3

4

5

###

        说明:              1- foreach()算子对自定义函数不要求有返回值,另外该算子也没有返回值             2- 因为底层是多线程运行的,因此输出结果分区间可能乱序             3- 该算子一般用来对结果数据保存到数据库或者文件中

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: