文章目录

1. count(distinct) 去重2. 双重group by 去重3. row_number() over() 窗口函数去重4. sortWithinPartitions + dropDuplicates5. mapPartitions + HashSet分区内去重

1. count(distinct) 去重

sql中最简单的方式,当数据量小的时候性能还好.当数据量大的时候性能较差.因为distinct全局只有一个reduce任务来做去重操作,极容易发生数据倾斜的情况,整体运行效率较慢. 示例: (对uid去重)

select count(distinct uid) uv,name,age

from A

group by name,age

2. 双重group by 去重

双重group by将去重分成了两步,是分组聚合运算,group by操作能进行多个reduce任务并行处理,每个reduce都能收到一部分数据然后进行分组内去重,不再像distinct只有一个reduce进行全局去重. 并且双重group by使用了“分而治之”的思想,先局部聚合和再进行全局汇总,整体效率要比distinct高很多.

首先根据uid,name,age进行分区内去重将分组内去重后的数据进行全局去重.

select count(uid) uv,name,age

from (

select uid,name,age

from A

group by uid,name,age

) a

group by name,age

3. row_number() over() 窗口函数去重

窗口函数(开窗)同样可实现去重功能,相比distinct和group by, row_number() over()要更加灵活,我们在日常需求中,经常遇到需要保留多条重复数据中的某一条或者某几条. 如:

相同的用户名中我们需要年龄最大的那一个相同数据中我们需要时间戳最小的那几个 这时候,row_number() over()可以完美解决我们的问题

row_number() over(partiiton by xxx order by xxx [asc / desc])

# 相同用户名只取年龄最大的那一个

select uname

from (

select uname,row_number() over(partition by age desc) rn

from A

) s

where rn = 1

# 相同数据中,只取时间戳最小的3条数据

select uid

from (

select uid, row_number() over(partition by uid order by log_timestamp asc) rn

from A

) s

where rn <= 3

4. sortWithinPartitions + dropDuplicates

DataFrame中,可以先将分区内数据进行排序,然后通过dropDuplicates将重复的数据删除.

sortWithinPartitions: 将分区内的数据进行排序,通过源码我们可以看出此算子和sql中sort by作用相同,并且此算子是对每个executor中数据进行排序,并不会发生shuffle,所以效率较高.但是此算子并没办法对排序的字段进行倒排(desc),默认是正排(asc).dropDuplicates: 此算子是将排序中重复的字段进行去重,最终只保留第一条. 本人理解,此操作和sublime中先排序再通过正则去重的思路完全一致 根据logid排序并进行去重,最终只保留一条logid

val newExt = extractResDf

.sortWithinPartitions(col("logid"))

.dropDuplicates("logid")

5. mapPartitions + HashSet分区内去重

因为业务需要,上面的效率都有些慢,无法满足日常跑数,我们是不是可以省去对分区内数据的排序操作,直接进行去重操作呢? 于是,我就想到了 mapPartitions算子 + HashSet进行分区内去重,就省去了排序的步骤,效率更高.

val testSchema: StructType = testDf.schema

val testEncoder: ExpressionEncoder[Row] = RowEncoder(testSchema)

testDf.mapPartitions(line => {

val rowBuf = new scala.collection.mutable.ArrayBuffer[Row]()

val logidSet = new mutable.HashSet[String]

line.foreach(row => {

if (logidSet.add(row.getAs[String]("logid"))) {

rowBuf.append(row)

}

})

rowBuf.iterator

})(testEncoder)

这里使用mapPartition遍历每个分区的数据, 然后每个分区中有一个HashSet,向HashSet中添加数据,如果

logidSet.add(row.getAs[String]("logid"))

方法返回结果为true,则说明HashSet中不存在此logid,不进行去重; 如果返回结果为false,则说明此logid已经存在,不放入到ArrayBuffer中即可,最终就能实现每个logid只在分区内保留一条的结果.

注: 如果不加endocer,会报如下错: Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: