文章目录

一、函数使用示例1. `map`2. `flatMap`3. `reduceByKey`4. `sortBy`5. `take`6. `distinct`7. `saveAsTextFile`8. `textFile`

二、函数使用场景及注意事项1. `map`2. `flatMap`3. `reduceByKey`4. `sortBy`5. `take`6. `distinct`7. `saveAsTextFile`8. `textFile`

三、总结示例

一、函数使用示例

之前写了PySpark的简单使用,下面汇总下常用函数,及使用示例。

1. map

作用:对 RDD/Dataset 中的每个元素应用一个转换操作,并返回新的 RDD/Dataset。示例:rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

mapped_rdd = rdd.map(lambda x: x * 2)

print(mapped_rdd.collect()) # 输出 [2, 4, 6, 8, 10]

2. flatMap

作用:对 RDD/Dataset 中的每个元素应用一个返回多个元素的转换操作,并返回扁平化的结果。示例:rdd = spark.sparkContext.parallelize(["Hello World", "Spark is great"])

flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))

print(flat_mapped_rdd.collect()) # 输出 ['Hello', 'World', 'Spark', 'is', 'great']

3. reduceByKey

作用:对键值对类型的 RDD/Dataset 进行分组,并对每个键的值应用聚合函数。示例:rdd = spark.sparkContext.parallelize([(1, 2), (1, 4), (2, 3), (2, 5)])

reduced_rdd = rdd.reduceByKey(lambda a, b: a + b)

print(reduced_rdd.collect()) # 输出 [(1, 6), (2, 8)]

4. sortBy

作用:对 RDD/Dataset 中的元素进行排序。示例:rdd = spark.sparkContext.parallelize([5, 3, 1, 4, 2])

sorted_rdd = rdd.sortBy(lambda x: x)

print(sorted_rdd.collect()) # 输出 [1, 2, 3, 4, 5]

5. take

作用:返回 RDD/Dataset 中的前几个元素。示例:rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

taken_elements = rdd.take(3)

print(taken_elements) # 输出 [1, 2, 3]

6. distinct

作用:去除 RDD/Dataset 中的重复元素。示例:rdd = spark.sparkContext.parallelize([1, 2, 3, 2, 4, 3, 5])

distinct_rdd = rdd.distinct()

print(distinct_rdd.collect()) # 输出 [1, 2, 3, 4, 5]

7. saveAsTextFile

作用:将 RDD/Dataset 保存为文本文件。示例:rdd = spark.sparkContext.parallelize(["Hello", "World", "Spark"])

rdd.saveAsTextFile("output.txt")

8. textFile

作用:从文本文件中创建一个 RDD/Dataset。示例:rdd = spark.sparkContext.textFile("data.txt")

print(rdd.collect())

二、函数使用场景及注意事项

下面是关于 Spark 常用函数的使用场景和注意事项的汇总:

以下是 PySpark 常用函数的使用场景和注意事项的汇总:

1. map

使用场景:对 RDD/Dataset 中的每个元素应用一个转换操作,并返回新的 RDD/Dataset。注意事项:

输入和输出的元素类型可以不同。在转换操作中,可以使用 lambda 表达式、函数或方法。适用于处理每个元素独立的转换操作,如元素翻倍、字符串拆分等。

2. flatMap

使用场景:对 RDD/Dataset 中的每个元素应用一个返回多个元素的转换操作,并返回扁平化的结果。注意事项:

输入和输出的元素类型可以不同。在转换操作中,可以使用 lambda 表达式、函数或方法。适用于将元素拆分为多个子元素的操作,如字符串拆分成单词、列表展开等。

3. reduceByKey

使用场景:对键值对类型的 RDD/Dataset 进行分组,并对每个键的值应用聚合函数。注意事项:

输入是键值对类型的 RDD/Dataset。需要提供一个聚合函数来定义对值的聚合规则。适用于按键进行分组并聚合值的操作,如求和、计数等。

4. sortBy

使用场景:对 RDD/Dataset 中的元素进行排序。注意事项:

可以指定一个或多个排序字段。默认情况下,按升序排序,可以通过设置参数进行降序排序。适用于对元素进行排序的操作。

5. take

使用场景:返回 RDD/Dataset 中的前几个元素。注意事项:

需要提供要获取的元素数量作为参数。结果将作为数组返回。适用于需要获取前几个元素的操作。

6. distinct

使用场景:去除 RDD/Dataset 中的重复元素。注意事项:

根据元素的值进行去重。结果将保留一个唯一的元素集合。适用于需要去除重复元素的操作。

7. saveAsTextFile

使用场景:将 RDD/Dataset 保存为文本文件。注意事项:

指定保存路径和文件名。可以将 RDD/Dataset 保存为一个或多个文本文件。适用于将结果保存到文本文件中的操作。

8. textFile

使用场景:从文本文件中创建一个 RDD/Dataset。注意事项:

指定要读取的文本文件路径。可以读取一个或多个文本文件,并将每一行作为 RDD/Dataset 的一个元素。适用于从文本文件中加载数据的操作。

三、总结示例

from pyspark.sql import SparkSession

# 创建 SparkSession

spark = SparkSession.builder \

.appName("Data Example") \

.getOrCreate()

# 读取文本文件作为 DataFrame

data = spark.read.text("data.txt")

# 对每一行数据进行转换和处理

processed_data = data.rdd.map(lambda row: row[0].split(" ")) \

.flatMap(lambda words: [(word, 1) for word in words]) \

.reduceByKey(lambda a, b: a + b)

# 将结果按照词频排序

sorted_data = processed_data.sortBy(lambda x: x[1], ascending=False)

# 取出前 10 个结果

top_10_words = sorted_data.take(10)

# 打印结果

for word, count in top_10_words:

print(f"{word}: {count}")

# 保存结果为文本文件

sorted_data.saveAsTextFile("result.txt")

# 关闭 SparkSession

spark.stop()

上述示例演示了以下步骤:

创建 SparkSession。通过 read.text() 方法读取文本文件,并创建一个 DataFrame。对每一行数据应用转换操作,使用 map() 和 flatMap() 分别进行单词拆分和计数。使用 reduceByKey() 对相同单词的计数进行求和。使用 sortBy() 对计数结果进行降序排序。使用 take() 提取前 10 个结果,并打印输出。使用 saveAsTextFile() 将排序后的结果保存为文本文件。关闭 SparkSession。

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: