Flink是一种一站式处理的框架,既可以进行批处理(DataSet),也可以进行流处理(DataStream)

将Flink的算子分为两大类:DataSet 和 DataStream

DataSet

1. Source源算子

1.1 fromCollection

从本地集合读取数据

val env = ExecutionEnvironment.getExecutionEnvironment

val textDataSet: DataSet[String] = env.fromCollection(

List("1,张三", "2,李四", "3,王五", "4,赵六")

)

1.2 readTextFile

从文件中读取

val textDataSet: DataSet[String] = env.readTextFile("/data/a.txt")

1.3 readTextFile 遍历目录

对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式

val parameters = new Configuration

// recursive.file.enumeration 开启递归

parameters.setBoolean("recursive.file.enumeration", true)

val file = env.readTextFile("/data").withParameters(parameters)

1.4 readTextFile 读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性

val file = env.readTextFile("/data/file.gz")

2.Transform转换算子

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子

val env = ExecutionEnvironment.getExecutionEnvironment

val textDataSet: DataSet[String] = env.fromCollection(

List("张三,1", "李四,2", "王五,3", "张三,4")

)

2.1 map

将DataSet中的每一个元素转换为另一个元素

底层为MapFunction算子。通过调用map函数,对每个元素执行操作。常用于数据清洗、计算和转换等。

// 使用map将List转换为一个Scala的样例类

case class User(name: String, id: String)

val userDataSet: DataSet[User] = textDataSet.map {

text =>

val fieldArr = text.split(",")

User(fieldArr(0), fieldArr(1))

}

userDataSet.print()

2.2 flatmap

将DataSet中的每一个元素转换为n个元素

// 使用flatMap操作,将集合中的数据:

// 根据第一个元素,进行分组

// 根据第二个元素,进行聚合求值

val result = textDataSet.flatMap(line => line)

.groupBy(0) // 根据第一个元素,进行分组

.sum(1) // 根据第二个元素,进行聚合求值

result.print()

2.3 mapPartition

将一个分区中的元素转换为另一个元素

// 使用mapPartition操作,将List转换为一个scala的样例类

case class User(name: String, id: String)

val result: DataSet[User] = textDataSet.mapPartition(line => {

line.map(index => User(index._1, index._2))

})

result.print()

2.4 filter

过滤出来一些符合条件的数据

val source: DataSet[String] = env.fromElements("java", "scala", "java")

val filter:DataSet[String] = source.filter(line => line.contains("java"))//过滤出带java的数据

filter.print()

2.5 keyBy算子

根据指定key对DataStream数据集分区

        相同key值的数据归并到同一分区

val inputStream = env.fromElements(

("aa", 11), ("aa", 22), ("bb", 33)

)

// 根据第一个字段作为key分区

// 转换为KeyedStream[(String, String), Tuple]

val keyedStream: inputStream.keyBy(0)

2.6 reduce 

将一个DataSet或者一个group来进行聚合计算,最终聚合成一个元素

根据key分区聚合形成KeyedStream支持运算符和自定义reduceFunc函数

// 使用 fromElements 构建数据源

val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))

// 使用map转换成DataSet元组

val mapData: DataSet[(String, Int)] = source.map(line => line)

// 根据首个元素分组

val groupData = mapData.groupBy(_._1)

// 使用reduce聚合

val reduceData = groupData.reduce((x, y) => (x._1, x._2 + y._2))

// 打印测试

reduceData.print()

自定义Reduce函数,需要实现匿名类。

val reduceDataStream = keyedStream.reeduce(

new ReduceFunction[(String, Int)] {

override def reduce(t1: (String,Int),

t2: (String, Int)): (String, Int) = {

(t1._1, t1._2 + t2._2)

}

}

)

5.7 reduceGroup

将一个DataSet或者一个group聚合成一个或多个元素

// 使用 fromElements 构建数据源

val source: DataSet[(String, Int)] = env.fromElements(("java", 1), ("scala", 1), ("java", 1))

// 根据首个元素分组

val groupData = source.groupBy(_._1)

// 使用reduceGroup聚合

val result: DataSet[(String, Int)] = groupData.reduceGroup {

(in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>

val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))

out.collect(tuple)

}

// 打印测试

5.8 minBy和maxBy

选择有最大或最小值的元素

// 使用minBy操作,求List中每个人的最小值

// List("张三,1", "李四,2", "王五,3", "张三,4")

case class User(name: String, id: String)

// 将List转换为一个scala的样例类

val text: DataSet[User] = textDataSet.mapPartition(line => {

line.map(index => User(index._1, index._2))

})

val result = text

.groupBy(0) // 按照姓名分组

.minBy(1) // 每个人的最小值

5.9 Aggregate

在数据集上进行聚合求最值(最大 最小)

val data = new mutable.MutableList[(Int, String, Double)]

data.+=((1, "yuwen", 89.0))

data.+=((2, "shuxue", 92.2))

data.+=((3, "yuwen", 89.99))

// 使用 fromElements 构建数据源

val input: DataSet[(Int, String, Double)] = env.fromCollection(data)

// 使用group执行分组操作

val value = input.groupBy(1)

// 使用aggregate求最大值元素

.aggregate(Aggregations.MAX, 2)

// 打印测试

value.print()

只能用在元组上

注意: 要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0) ,否则会报一下错误: Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

5.10 distinct

去除重复数据

// 数据源使用上一题的

// 使用distinct操作,根据科目去除集合中重复的元组数据

val value: DataSet[(Int, String, Double)] = input.distinct(1)

value.print()

5.11 join

将两个DataSet以一定的条件连接到一起,形成新的DataSet

// s1 和 s2 数据集格式如下:

// DataSet[(Int, String,String, Double)]

val joinData = s1.join(s2) // s1数据集 join s2数据集

.where(0).equalTo(0) { // join的条件

(s1, s2) => (s1._1, s1._2, s2._2, s1._3)

}

5.12 leftOuterJoin

左外连接,左边的Dataset中的每一个元素,去连接右边的元素

此外还有:

rightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素

fullOuterJoin:全外连接,左右两边的元素,全部连接

下面以 leftOuterJoin 进行示例:

val data1 = ListBuffer[Tuple2[Int,String]]()

data1.append((1,"zhangsan"))

data1.append((2,"lisi"))

data1.append((3,"wangwu"))

data1.append((4,"zhaoliu"))

val data2 = ListBuffer[Tuple2[Int,String]]()

data2.append((1,"beijing"))

data2.append((2,"shanghai"))

data2.append((4,"guangzhou"))

val text1 = env.fromCollection(data1)

val text2 = env.fromCollection(data2)

text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{

if(second==null){

(first._1,first._2,"null")

}else{

(first._1,first._2,second._2)

}

}).print()

5.13 cross

交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集

val cross = input1.cross(input2){

(input1 , input2) => (input1._1,input1._2,input1._3,input2._2)

}

cross.print()

5.14 union

将两个数据集合并,但不去重

val unionData: DataSet[String] = elements1.union(elements2).union(elements3)

// 去除重复数据

val value = unionData.distinct(line => line)

5.15 rebalance

解决数据倾斜,保证每个机器完成计算的时间相近

将数据均匀分配到每个机器上执行

// 使用rebalance操作,避免数据倾斜

val rebalance = filterData.rebalance()

5.16 partitionByHash

按指定的key进行hash分区

val data = new mutable.MutableList[(Int, Long, String)]

data.+=((1, 1L, "Hi"))

data.+=((2, 2L, "Hello"))

data.+=((3, 2L, "Hello world"))

val collection = env.fromCollection(data)

val unique = collection.partitionByHash(1).mapPartition{

line =>

line.map(x => (x._1 , x._2 , x._3))

}

unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)

env.execute()

5.17partitionByRange

根据指定的key对数据集进行范围分区

val data = new mutable.MutableList[(Int, Long, String)]

data.+=((1, 1L, "Hi"))

data.+=((2, 2L, "Hello"))

data.+=((3, 2L, "Hello world"))

data.+=((4, 3L, "Hello world, how are you?"))

val collection = env.fromCollection(data)

val unique = collection.partitionByRange(x => x._1).mapPartition(line => line.map{

x=>

(x._1 , x._2 , x._3)

})

unique.writeAsText("rangePartition", WriteMode.OVERWRITE)

env.execute()

5.18 sortPatition

根据指定的字段值进行分区的排序

val data = new mutable.MutableList[(Int, Long, String)]

data.+=((1, 1L, "Hi"))

data.+=((2, 2L, "Hello"))

data.+=((3, 2L, "Hello world"))

data.+=((4, 3L, "Hello world, how are you?"))

val ds = env.fromCollection(data)

val result = ds

.map { x => x }.setParallelism(2)

.sortPartition(1, Order.DESCENDING)//第一个参数代表按照哪个字段进行分区

.mapPartition(line => line)

.collect()

println(result)

3.Sink输出算子

3.1 collect

将数据输出到本地集合

result.collect()

3.2 writeAsText

将数据输出到文件

Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等

Flink支持多种文件的存储格式,包括text文件,CSV文件等

// 将数据写入本地文件

result.writeAsText("/data/a", WriteMode.OVERWRITE)

// 将数据写入HDFS

result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: