文章目录

概述第一部分:基础概念01-基础概念【DataStream】02-基础概念【并行度设置】03-基础概念【资源槽Slot】

第二部分:Data Source & Data Sink04-Data Source【基本数据源】① 集合Collection数据源② 文件File数据源

05-Data Source【自定义数据源】06-Data Source【MySQL Source】07-Data Sink【MySQL Sink】

第三部分:DataStream Transformations08-Transformation【算子概述】09-Transformation【map 算子】10-Transformation【flatMap 算子】11-Transformation 【filter 算子 】12-Transformation【keyBy 算子 】13-Transformation【reduce 算子】14-Transformation【max和min 算子 】15-Transformation【union和connect 算子 】16-[掌握]-Transformation 【Side Outputs】

概述

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流 处理和批处理两种类型应用的功能。

第一部分:基础概念

01-基础概念【DataStream】

​ 在Flink计算引擎中,将数据当做:数据流DataStream,分为有界数据流和无界数据流。

​ [任何类型的数据都可以形成一种事件流,如信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。]

1)、有边界流(bounded stream):==有定义流的开始,也有定义流的结束。==有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。2)、无边界流(unbounded stream):有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

DataStream(数据流)官方定义:

DataStream(数据流)源码中定义:

DataStream有如下几个子类:

1)、DataStreamSource:

表示从数据源直接获取数据流DataStream,比如从Socket或Kafka直接消费数据 2)、KeyedStream:

当DataStream数据流进行分组时(调用keyBy),产生流称为KeyedStream,按照指定Key分组;通常情况下数据流被分组以后,需要进行窗口window操作或聚合操作。 3)、SingleOutputStreamOperator:

当DataStream数据流没有进行keyBy分组,而是使用转换函数,产生的流称为SingleOutputStreamOperator。比如使用filter、map、flatMap等函数,产生的流就是SingleOutputStreamOperator 4)、IterativeStream:迭代流,表示对流中数据进行迭代计算,比如机器学习,图计算等。

DataStream类是泛型(类型参数),数据类型支持如下所示:

在Flink计算引擎中,提供4个层次API,如下所示:

Flink中流计算DataStream层次API在使用时,还是包括三个方面:Source/Transformation/Sink

基于Flink开发流式计算程序五个步骤:

# 1)、Obtain an execution environment,

执行环境-env:StreamExecutionEnvironment

# 2)、Load/create the initial data,

数据源-source:DataStream

# 3)、Specify transformations on this data,

数据转换-transformation:DataStream API(算子,Operator)

# 4)、Specify where to put the results of your computations,

数据接收器-sink

# 5)、Trigger the program execution

触发执行-execute

在IDEA中创建Flink Stream流计算编程模板:FlinkClass

模块中内容:FlinkClass

#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "") package ${PACKAGE_NAME};#end

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

*

* @author xuanyu

*/

public class ${NAME} {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 数据源-source

// 3. 数据转换-transformation

// 4. 数据终端-sink

// 5. 触发执行-execute

env.execute("${NAME}") ;

}

}

依据上述定义FlinkStream模块Template,创建Flink Stream编程类:StreamDemo

02-基础概念【并行度设置】

一个Flink程序由多个Operator组成(source、transformation和 sink)。

​ 一个Operator由多个并行的SubTask(以线程方式)来执行, 一个Operator的并行SubTask(数目就被称为该Operator(任务)的并行度(Parallelism)。

在Flink 中,并行度设置可以从4个层次级别指定,具体如下所示:

1)、Operator Level(算子级别)(可以使用)

一个operator、source和sink的并行度可以通过调用 setParallelism()方法来指定。

2)、Execution Environment Level(Env级别,可以使用)

执行环境并行度可以通过调用setParallelism()方法指定。

3)、Client Level(客户端级别,推荐使用)

并行度可以在客户端将job提交到Flink时设定,对于CLI客户端,可以通过-p参数指定并行度

4)、System Level(系统默认级别,尽量不使用)

​ 在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度。

package cn.itqzd.flink.parallelism;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

/**

* 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。

1.执行环境-env

2.数据源-source

3.数据转换-transformation

4.数据接收器-sink

5.触发执行-execute

* @author xuyuan

*/

public class WordCountParallelism {

/**

* 当运行Flink 程序时,传递参数,获取对应host和port值

* todo: WordCount --host node1 --port 9999

*/

public static void main(String[] args) throws Exception {

// 构建参数解析工具类ParameterTool

ParameterTool parameterTool = ParameterTool.fromArgs(args);

if(parameterTool.getNumberOfParameters() != 2){

System.out.println("Usage: WordCount --host --port ............");

System.exit(-1);

}

String host = parameterTool.get("host");

int port = parameterTool.getInt("port", 9999) ;

// 1.执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;

// todo: 执行环境级别并行度

env.setParallelism(2) ;

// 2.数据源-source

DataStreamSource inputDataStream = env.socketTextStream(host, port);

// 3.数据转换-transformation

/*

流中每条数据: flink spark flink

|

词频统计,步骤与批处理完全一致

*/

// 3-1. 分割单词

SingleOutputStreamOperator wordDataStream = inputDataStream.flatMap(

new FlatMapFunction() {

@Override

public void flatMap(String value, Collector out) throws Exception {

String[] words = value.split("\\s+");

for (String word : words) {

out.collect(word);

}

}

}

);

// 3-2. 转换为二元组

SingleOutputStreamOperator> tupleDataStream = wordDataStream.map(

new MapFunction>() {

@Override

public Tuple2 map(String value) throws Exception {

return Tuple2.of(value, 1);

}

}

);

// 3-3. 按照单词分组,并且组求和

SingleOutputStreamOperator> resultDataStream = tupleDataStream

.keyBy(0)

.sum(1);

// 4.数据接收器-sink, todo: 算子级别设置并行度,优先级最高

resultDataStream.print().setParallelism(1);

// 5.触发执行-execute

env.execute("StreamWordCount");

}

}

总结:并行度的优先级:算子级别 > env级别 > Client级别 > 系统默认级别

1)、如果source不可以被并行执行,即使指定了并行度为多个,也不会生效2)、实际生产中,推荐在算子级别显示指定各自的并行度,方便进行显示和精确的资源控制。3)、slot是静态的概念,是指taskmanager具有的并发执行能力; parallelism是动态的概念,是指程序运行时实际使用的并发能力。

03-基础概念【资源槽Slot】

Flink中运行Task任务(SubTask)在Slot资源槽中: [Slot为子任务SubTask运行资源抽象,每个TaskManager运行时设置Slot个数。]

官方建议:

Slot资源槽个数 = CPU Core核数

也就是说,

分配给TaskManager多少CPU Core核数,可以等价为Slot个数

每个TaskManager运行时设置内存大小:[TaskManager中内存平均划分给Slot]。

举例说明:

假设TaskManager中分配内存为:4GB,Slot个数为4个,此时每个Slot分配内存就是 4GB / 4 = 1GB 内存

每个Slot中运行SubTask子任务,以线程Thread方式运行。

1个Job中不同类型SubTask任务,可以运行在同一个Slot中,称为:[Slot Sharded 资源槽共享]1个Job中相同类型SubTask任务必须运行在不同Slot中

第二部分:Data Source & Data Sink

DataStream API 编程

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview

# 1、Data Sources 数据源

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sources

# 2、DataStream Transformations

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/

# 3、Data Sinks 数据接收器

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sinks

04-Data Source【基本数据源】

​ 针对Flink 流计算来说,数据源可以是有界数据源(静态数据),也可以是无界数据源(流式数据),原因在于Flink框架中,将数据统一封装称为DataStream数据流。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sources

1、基于File文件数据源

readTextFile(path)

2、Sockect 数据源

socketTextStream

3、基于Collection数据源

fromCollection(Collection)

fromElements(T ...)

fromSequence(from, to),相当于Python中range

4、自定义Custom数据源

env.addSource()

官方提供接口:

SourceFunction 非并行

RichSourceFunction

ParallelSourceFunction 并行

RichParallelSourceFunction

① 集合Collection数据源

基于集合Collection数据源Source,一般用于学习测试。

package cn.itqzd.flink.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**

* Flink 流计算数据源:基于集合的Source,分别为可变参数、集合和自动生成数据

* TODO: 基于集合数据源Source构建DataStream,属于有界数据流,当数据处理完成以后,应用结束

*/

public class StreamSourceCollectionDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1) ;

// 2. 数据源-source

// 方式一:可变参数

DataStreamSource dataStream01 = env.fromElements("spark", "flink", "mapreduce");

dataStream01.print();

// 方式二:集合对象

DataStreamSource dataStream02 = env.fromCollection(Arrays.asList("spark", "flink", "mapreduce"));

dataStream02.printToErr();

// 方式三:自动生成序列数字

DataStreamSource dataStream03 = env.fromSequence(1, 10);

dataStream03.print();

// 5. 触发执行-execute

env.execute("StreamSourceCollectionDemo") ;

}

}

② 文件File数据源

基于文件数据源, 一般用于学习测试,演示代码如下所示:

从文本文件加载数据时,可以是压缩文件,支持压缩格式如下图。

案例演示代码:StreamSourceFileDemo

package cn.itqzd.flink.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

* Flink 流计算数据源:基于文件的Source

*/

public class StreamSourceFileDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1) ;

// 2. 数据源-source

// 方式一:读取文本文件

DataStreamSource dataStream01 = env.readTextFile("datas/words.txt");

dataStream01.printToErr();

// 方式二:读取压缩文件

DataStreamSource dataStream02 = env.readTextFile("datas/words.txt.gz");

dataStream02.print();

// 5. 触发执行-execute

env.execute("StreamSourceFileDemo") ;

}

}

05-Data Source【自定义数据源】

在Flink 流计算中,提供数据源Source接口,用户实现自定义数据源,可以从任何地方获取数据。

1、SourceFunction:

非并行数据源(并行度parallelism=1)

2、RichSourceFunction:

多功能非并行数据源(并行度parallelism=1)

3、ParallelSourceFunction:

并行数据源(并行度parallelism>=1)

4、RichParallelSourceFunction:

多功能并行数据源(parallelism>=1),Kafka数据源使用该接口

实际项目中,如果自定义数据源,实现接口:RichSourceFunction或RichParallelSourceFunction。

查看SourceFunction接口中方法:

# 第一个方法:run

实时从数据源端加载数据,并且发送给下一个Operator算子,进行处理

实时产生数据

# 第二个方法:cancel

字面意思:取消

当将Job作业取消时,不在从数据源端读取数据

# 总结:当基于数据源接口自定义数据源时,只要实现上述2个 方法即可。

需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

创建类:OrderSource,实现接口【RichParallelSourceFunction】,实现其中run和cancel方法。

编程实现自定义数据源:StreamSourceOrderDemo,实时产生交易订单数据。

package cn.itqzd.flink.source;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;

import java.util.UUID;

import java.util.concurrent.TimeUnit;

/**

* 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

* - 随机生成订单ID:UUID

* - 随机生成用户ID:0-2

* - 随机生成订单金额:0-100

* - 时间戳为当前系统时间:current_timestamp

*/

public class StreamSourceOrderDemo {

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class Order {

private String id;

private Integer userId;

private Double money;

private Long orderTime;

}

/**

* 自定义数据源,继承抽象类:RichParallelSourceFunction,并行的和富有的

*/

private static class OrderSource extends RichParallelSourceFunction {

// 定义变量,用于标识是否产生数据

private boolean isRunning = true ;

// 表示产生数据,从数据源Source源源不断加载数据

@Override

public void run(SourceContext ctx) throws Exception {

Random random = new Random();

while (isRunning){

// 产生交易订单数据

Order order = new Order(

UUID.randomUUID().toString(), //

random.nextInt(2), //

(double)random.nextInt(100), //

System.currentTimeMillis()

);

// 发送交易订单数据

ctx.collect(order);

// 每隔1秒产生1条数据,休眠1秒钟

TimeUnit.SECONDS.sleep(1);

}

}

// 取消从数据源加载数据

@Override

public void cancel() {

isRunning = false ;

}

}

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;

env.setParallelism(1);

// 2. 数据源-source

DataStreamSource orderDataStream = env.addSource(new OrderSource());

// 3. 数据转换-transformation

// 4. 数据接收器-sink

orderDataStream.printToErr();

// 5. 触发执行-execute

env.execute("StreamSourceOrderDemo") ;

}

}

运行流式计算程序,查看模拟产生订单数据:

06-Data Source【MySQL Source】

需求:从MySQL中实时加载数据,要求MySQL中的数据有变化,也能被实时加载出来

1)、数据准备

CREATE DATABASE IF NOT EXISTS db_flink ;

CREATE TABLE IF NOT EXISTS db_flink.t_student (

id int(11) NOT NULL AUTO_INCREMENT,

name varchar(255) DEFAULT NULL,

age int(11) DEFAULT NULL,

PRIMARY KEY (id)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

INSERT INTO db_flink.t_student VALUES ('1', 'jack', 18);

INSERT INTO db_flink.t_student VALUES ('2', 'tom', 19);

INSERT INTO db_flink.t_student VALUES ('3', 'rose', 20);

INSERT INTO db_flink.t_student VALUES ('4', 'tom', 19);

INSERT INTO db_flink.t_student VALUES ('5', 'jack', 18);

INSERT INTO db_flink.t_student VALUES ('6', 'rose', 20);

2)、自定义数据源:MysqlSource

实现run方法,实现每隔1秒加载1次数据库表数据,此时数据有更新都会即使查询。

package cn.itqzd.flink.source;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.util.concurrent.TimeUnit;

/**

* 从MySQL中实时加载数据:要求MySQL数据的数据有变化,也能被实时加载出来(每隔几秒钟加载一次数据)

* @author xuyuan

*/

public class StreamSourceMysqlDemo {

@Data

@AllArgsConstructor

@NoArgsConstructor

static class Student{

private Integer id ;

private String name ;

private Integer age ;

}

/**

* 自定义数据源,从MySQL表中加载数据,并且实时增量加载,每隔5秒钟加载一次

*/

private static class MysqlSource extends RichSourceFunction {

// 定义变量,标识是否加载数据

private boolean isRunning = true ;

// 定义变量

private Connection conn = null ;

private PreparedStatement pstmt = null ;

private ResultSet result = null ;

// todo: 在运行run方法之前初始化操作,比如获取连接

@Override

public void open(Configuration parameters) throws Exception {

// step1. 加载驱动

Class.forName("com.mysql.jdbc.Driver") ;

// step2. 获取连接

conn = DriverManager.getConnection(

"jdbc:mysql://node1:3306/?useSSL=false", "root", "123456"

);

// step3. 创建Statement对象

pstmt = conn.prepareStatement("SELECT id, name, age FROM db_flink.t_student") ;

}

@Override

public void run(SourceContext ctx) throws Exception {

while (isRunning){

// step4. 执行操作

result = pstmt.executeQuery();

// step5. 获取数据

while (result.next()){

// 获取每条数据

int stuId = result.getInt("id");

String stuName = result.getString("name");

int stuAge = result.getInt("age");

// todo: 封装数据到实体类对象中

Student student = new Student(stuId, stuName, stuAge);

// 发送数据到下游

ctx.collect(student);

}

// 每隔5秒加载一次数据

TimeUnit.SECONDS.sleep(5);

}

}

@Override

public void cancel() {

isRunning = false ;

}

// todo: 当run方法运行结束以后,收尾工作,比如释放资源,关闭连接

@Override

public void close() throws Exception {

// step6. 关闭连接

if(null != result) {

result.close();

}

if(null != pstmt ) {

pstmt.close();

}

if(null != conn) {

conn.close();

}

}

}

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1) ;

// 2. 数据源-source

MysqlSource mysqlSource = new MysqlSource();

DataStreamSource studentDataStream = env.addSource(mysqlSource);

// 3. 数据转换-transformation

// 4. 数据终端-sink

studentDataStream.printToErr();

// 5. 触发执行-execute

env.execute("StreamSourceMySQLDemo");

}

}

07-Data Sink【MySQL Sink】

Flink 流计算中数据接收器Sink,基本数据保存和自定义Sink保存。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sinks

# 1、写入文件,API已经过时,不推荐使用

writeAsText

writeAsCsv

# 2、打印控制台,开发测试使用

print,标准输出

printToErr,错误输出

# 3、写入Socket

很少使用

# 4、自定义数据接收器Sink

Sink接口:SinkFunction、RichSinkFunction

datastream.addSink 添加流式数据输出Sink

# 需求:Flink 流式计算程序,实时从Kafka消费数据(保险行业),将数据ETL转换,存储到HBase表

Flink 1.10版本中,DataStream未提供与HBase集成Connector连接器

自定实现SinkFunction接口,向HBase表写入数据即可

https://www.jianshu.com/p/1c29750ed814

将数据写入文件方法:writeAsText和writeAsCsv全部过时,提供新的Connector:StreamingFileSink

需求:将Flink集合中的数据集DataStream,通过自定义Sink保存到MySQL。

CREATE DATABASE IF NOT EXISTS db_flink ;

CREATE TABLE IF NOT EXISTS db_flink.t_student (

id int(11) NOT NULL AUTO_INCREMENT,

name varchar(255) DEFAULT NULL,

age int(11) DEFAULT NULL,

PRIMARY KEY (id)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

INSERT INTO db_flink.t_student VALUES ('100', 'zhangsan', 24);

[当自定义Flink中Sink时,需要实现接口:SinkFunction或RichSinkFunction]

package cn.itqzd.flink.sink;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

/**

* 案例演示:自动Sink,将数据保存到MySQL表中,实现类RichSinkFunction

* @author xuyuan

*/

public class StreamSinkMysqlDemo {

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class Student{

private Integer id ;

private String name ;

private Integer age ;

}

/**

* 自定义Sink接收器,将DataStream中数据写入到MySQL数据库表中

*/

private static class MysqlSink extends RichSinkFunction {

// 定义变量

private Connection conn = null ;

private PreparedStatement pstmt = null ;

// todo: 初始化操作,比如获取连接

@Override

public void open(Configuration parameters) throws Exception {

// step1. 加载驱动

Class.forName("com.mysql.jdbc.Driver") ;

// step2. 获取连接

conn = DriverManager.getConnection(

"jdbc:mysql://node1:3306/?useSSL=false", "root", "123456"

);

// step3. 创建Statement对象

pstmt = conn.prepareStatement("INSERT INTO db_flink.t_student(id, name, age) VALUES (?, ?, ?)") ;

}

// todo: 数据流中每条数据进行输出操作,调用invoke方法

@Override

public void invoke(Student student, Context context) throws Exception {

// step4、执行操作,先设置占位符值

pstmt.setInt(1, student.id);

pstmt.setString(2, student.name);

pstmt.setInt(3, student.age);

pstmt.execute();

}

// todo: 收尾工作,比如关闭连接,释放资源

@Override

public void close() throws Exception {

// step5. 关闭连接

if(null != pstmt) {

pstmt.close();

}

if(null != conn) {

conn.close();

}

}

}

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1) ;

// 2. 数据源-source

DataStreamSource inputDataStream = env.fromElements(

new Student(21, "wangwu", 20),

new Student(22, "zhaoliu", 19),

new Student(23, "laoda", 25),

new Student(24, "laoer", 23),

new Student(25, "laosan", 21)

);

// 3. 数据转换-transformation

// 4. 数据终端-sink

MysqlSink mysqlSink = new MysqlSink() ;

inputDataStream.addSink(mysqlSink) ;

// 5. 触发执行-execute

env.execute("StreamSinkMySQLDemo");

}

}

第三部分:DataStream Transformations

08-Transformation【算子概述】

从DataStream数据流转换角度看Transformation算子(函数),有如下四类操作:

文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/

1)、第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作) 2)、第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理 3)、第三类是对多个流进行操作合并转换为单个流。例如,多个流可以通过 Union、Join 或Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。 4)、第四类是DataStream支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

先讲解一些DataStream中最基本Operator算子使用,也是使用比较多。

09-Transformation【map 算子】

map函数使用说明:

需求:将读取文本文件数据,每行JSON格式数据,转换为ClickLog对象,使用map函数完成。

​ [将JSON格式字符串,解析转换为JavaBean对象,使用库:fastJson库]

package cn.itqzd.flink.transformation;

import com.alibaba.fastjson.JSON;

import lombok.Data;

import org.apache.commons.lang3.time.DateFormatUtils;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

/**

* Flink中流计算DataStream转换函数:map、flatMap和filter

*/

public class TransformationBasicDemo {

@Data

private static class ClickLog {

//频道ID

private long channelId;

//产品的类别ID

private long categoryId;

//产品ID

private long produceId;

//用户的ID

private long userId;

//国家

private String country;

//省份

private String province;

//城市

private String city;

//网络方式

private String network;

//来源方式

private String source;

//浏览器类型

private String browserType;

//进入网站时间

private Long entryTime;

//离开网站时间

private Long leaveTime;

}

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 2. 数据源-source

DataStream inputDataStream = env.readTextFile("datas/click.log");

// 3. 数据转换-transformation

// TODO: 函数一【map函数】,将JSON转换为JavaBean对象

DataStream clickDataStream = inputDataStream.map(new MapFunction() {

@Override

public ClickLog map(String line) throws Exception {

return JSON.parseObject(line, ClickLog.class);

}

});

clickDataStream.printToErr();

// 5. 触发执行-execute

env.execute("TransformationBasicDemo") ;

}

}

10-Transformation【flatMap 算子】

flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果,flatMap = map + flattern

案例演示说明:依据访问网站时间戳转换为不同时间日期格式数据

Long类型日期时间: 1577890860000

|

|进行格式

|

String类型日期格式

yyyy-MM-dd-HH

yyyy-MM-dd

yyyy-MM

// TODO: 函数二【flatMap】,每条数据转换为日期时间格式

/*

Long类型日期时间: 1577890860000

|

|进行格式

|

String类型日期格式

yyyy-MM-dd-HH

yyyy-MM-dd

yyyy-MM

*/

DataStream flatMapDataStream = clickDataStream.flatMap(new FlatMapFunction() {

@Override

public void flatMap(ClickLog clickLog, Collector out) throws Exception {

// 获取访问数据

Long entryTime = clickLog.getEntryTime();

// 格式一:yyyy-MM-dd-HH

String hour = DateFormatUtils.format(entryTime, "yyyy-MM-dd-HH");

out.collect(hour);

// 格式二:yyyy-MM-dd

String day = DateFormatUtils.format(entryTime, "yyyy-MM-dd");

out.collect(day);

// 格式三:yyyy-MM

String month = DateFormatUtils.format(entryTime, "yyyy-MM");

out.collect(month);

}

});

//flatMapDataStream.printToErr();

11-Transformation 【filter 算子 】

filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

需求:过滤出clickLog中使用谷歌浏览器访问的日志

// TODO: 函数三【filter函数】,过滤使用谷歌浏览器数据

DataStream filterDataStream = clickDataStream.filter(new FilterFunction() {

@Override

public boolean filter(ClickLog clickLog) throws Exception {

return "谷歌浏览器".equals(clickLog.getBrowserType());

}

});

//filterDataStream.printToErr();

12-Transformation【keyBy 算子 】

​ keyBy算子表示:按照指定的key来对流中的数据进行分组,分组后流称为KeyedStream,要么聚合操作(调用reduce、fold或aggregate函数等等),要么进行窗口操作window。

​ 在Flink中如果是批处理,分组使用函数:groupBy,从Flink 1.12以后开始,由于流批一体,无论是流计算还是批处理,分组函数:keyBy。

​ 在使用keyBy函数时,可以指定下标索引(数据类型为元组)、指定属性名称(数据类型为JavaBean)或指定KeySelector选择器。

案例代码演示:从TCP Socket消费数据,进行词频统计,先过滤和分词,再分组和聚合。

package cn.itqzd.flink.transformation;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

/**

* Flink中流计算DataStream转换算子:keyBy和sum 算子

* @author xuyuan

*/

public class TransformationKeyByDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1) ;

// 2. 数据源-source

DataStreamSource inputDataStream = env.socketTextStream("node1", 9999);

// 3. 数据转换-transformation

/*

flink spark flink

|flatMap

flink, spark, flink

|map

(flink, 1), (spark, 1), (flink, 1)

|keyBy(0)

flink -> [(flink, 1), (flink, 1)] spark -> [(spark, 1)]

|sum(1): todo -> 分组后,对每个组内数据求和操作

flink -> 1 + 1 = 2, spark -> 1 = 1

*/

// 3-0. 过滤掉空字符串

SingleOutputStreamOperator lineDataStream = inputDataStream.filter(new FilterFunction() {

@Override

public boolean filter(String line) throws Exception {

return line.trim().length() > 0;

}

});

// 3-1. 分割单词,使用map算子

SingleOutputStreamOperator wordDataStream = lineDataStream.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String line, Collector out) throws Exception {

String[] words = line.split("\\s+");

for (String word : words) {

out.collect(word);

}

}

});

// 3-2. 转换为二元组

SingleOutputStreamOperator> tupleDataStream = wordDataStream.map(new MapFunction>() {

@Override

public Tuple2 map(String word) throws Exception {

return Tuple2.of(word, 1);

}

});

// 3-3. 按照单词分组,并且组内求和

/*

.keyBy(new KeySelector, String>() {

@Override

public String getKey(Tuple2 tuple) throws Exception {

return tuple.f0;

}

})

*/

SingleOutputStreamOperator> resultDataStream = tupleDataStream

// lambda 表达式写法

.keyBy(tuple -> tuple.f0)

.sum(1);

// 4. 数据终端-sink

resultDataStream.printToErr();

// 5. 触发执行-execute

env.execute("TransformationKeyByDemo");

}

}

13-Transformation【reduce 算子】

[reduce:对Key分组中的元素进行聚合],有2个参数 (x, y),其中 x :tmp为聚合中间临时变量, y:item为聚合中每个元素。

​ [reduce 算子,仅仅针对DataStream被keyBy分组后KeyedStream数据进行聚合]

案例代码演示:修改词频统计WordCount程序,使用reduce代替 进行组内数据求和。

package cn.itqzd.flink.transformation;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

/**

* Flink中流计算DataStream转换函数:reduce聚合函数

*/

public class TransformationReduceDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(2);

// 2. 数据源-source

DataStream inputDataStream = env.socketTextStream("node1", 9999);

// 3. 数据转换-transformation

// todo: 过滤脏数据和转换为单词而源自

SingleOutputStreamOperator> tupleDataStream = inputDataStream

// 3-1. 过滤脏数据,todo:Java 8 提供Lambda表达式

.filter(line -> line.trim().length() > 0)

// 3-2. 每行数据分割为单词,并转换为二元组

.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String line, Collector> out) throws Exception {

String[] words = line.trim().split("\\s+");

for (String word : words) {

out.collect(Tuple2.of(word, 1));

}

}

});

// todo: 3-3. 使用reduce算子,对keyBy分组后流数据进行聚合操作(组内求和)

SingleOutputStreamOperator> outputDataStream = tupleDataStream

.keyBy(tuple -> tuple.f0)

.reduce(new ReduceFunction>() {

// todo: sc.parallisize([1, 2, 3, 4, 5]).reduce(lambda tmp, item: tmp + item)), tmp 初始值为分区第一个元素

@Override

public Tuple2 reduce(Tuple2 tmp,

Tuple2 item) throws Exception {

System.out.println("tmp = " + tmp + ", item = " + item);

/*

tmp:表示keyBy分组中每个Key对应结果值

key -> spark, tmp -> (spark, 10)

todo: 如果第一次对key数据聚合,直接将数据赋值给tmp

item: 表示使用keyBy分组后组内数据

(spark, 1)

*/

// 获取以前计算你只

Integer historyValue = tmp.f1;

// 获取现在传递值

Integer currentValue = item.f1;

// 计算最新值

int latestValue = historyValue + currentValue ;

//返回结果

return Tuple2.of(tmp.f0, latestValue);

}

});

// 4. 数据接收器-sink

outputDataStream.printToErr();

// 5. 触发执行-execute

env.execute("TransformationReduceDemo");

}

}

14-Transformation【max和min 算子 】

在DataStream API中,对数据按照keyBy分组,直接获取最大或最小值函数:min与minBy,及max与maxBy。

max或min:只会求出最大或最小的那个字段,其他的字段不管maxBy或minBy:会求出最大或最小的那个字段和对应的其他的字段

package cn.itqzd.flink.transformation;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

* Flink 中流计算DataStream转换算子:max或maxBy、min或minBy

* @author xuyuan

*/

public class TransformationMaxMinDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 2. 数据源-source

DataStream> inputDataStream = env.fromElements(

Tuple3.of("上海", "浦东新区", 777),

Tuple3.of("上海", "闵行区", 999),

Tuple3.of("上海", "杨浦区", 666),

Tuple3.of("北京", "东城区", 567),

Tuple3.of("北京", "西城区", 987),

Tuple3.of("上海", "静安区", 888),

Tuple3.of("北京", "海淀区", 9999)

);

// 3. 数据转换-transformation

// todo: max 最大值, 只关心指定字段最大值,其他字段不关心

DataStream> maxDataStream = inputDataStream

.keyBy(tuple -> tuple.f0)

.max(2);

maxDataStream.printToErr("max>");

// todo: maxBy 最大值,关心其他字段

DataStream> maxByDataStream = inputDataStream

.keyBy(tuple -> tuple.f0)

.maxBy(2);

maxByDataStream.printToErr("maxBy>");

// 4. 数据终端-sink

![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/034dddc7b02b4b428baea6f3d953b3fb.png#pic_center)

// 5. 触发执行-execute

env.execute("TransformationMaxMinDemo");

}

}

15-Transformation【union和connect 算子 】

1)union函数:可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。

​ [数据将按照先进先出(First In First Out)的模式合并,且不去重。]

2)、connect函数:与union函数功能类似,用来连接两个数据流,且2个数据流数据类型可不一样。

connect只能连接两个数据流,union可以连接多个数据流;connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致;

案例演示,分别对2个流DataStream进行union和connect操作:

将两个String类型的流进行union;将一个String类型和一个Long类型的流进行connect;

package cn.itqzd.flink.transformation;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**

* Flink中流计算DataStream转换算子:合并union和连接connect

* @author xuyuan

*/

public class TransformationUnionConnectDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1) ;

// 2. 数据源-source

DataStream dataStream01 = env.fromElements("A", "B", "C", "D");

DataStream dataStream02 = env.fromElements("aa", "bb", "cc", "dd");

DataStream dataStream03 = env.fromElements(1, 2, 3, 4);

// 3. 数据转换-transformation

// todO: 2个流进行union,要求流中数据类型必须相同

DataStream unionDataStream = dataStream01.union(dataStream02);

// unionDataStream.printToErr() ;

// todo: 2个流进行连接,connect 应用场景 -> 大表与小表维度关联

ConnectedStreams connectDataStream = dataStream01.connect(dataStream03);

// 对连接流中数据必须进行处理,才可以输出,需要调用转换算子:比如map、flatMap都可以

SingleOutputStreamOperator mapDataStream = connectDataStream.map(

// interface CoMapFunction

new CoMapFunction() {

// 连接流时,左边数据流中数据操作

@Override

public String map1(String value) throws Exception {

return "map1: left -> " + value;

}

// 连接流时,右边数据流中数据操作

@Override

public String map2(Integer value) throws Exception {

return "map2: right -> " + value;

}

}

);

mapDataStream.printToErr();

// 4. 数据终端-sink

// 5. 触发执行-execute

env.execute("TransformationUnionConnectDemo");

}

}

16-[掌握]-Transformation 【Side Outputs】

在Flink流计算中,提供API函数,将1个流分割为多个流,使用split算子和select算子。

Split就是将一个流分成多个流,Select就是获取分流后对应的数据。

​ [DataStream中split函数,分割流的本质:给DataStream流中每条数据打上标签Tag,最后依据标签Tag获取具体分割的流数据。]

​ 分割流函数split已经过时,并且在新版本中已经被删除,Flink提供:侧边输出SideOutput方式,可以将1个流进行侧边输出多个流。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/side_output/

第1步、定义输出标签OutputTag

OutputTag outputTag = new OutputTag("side-output") {};比如需要输出2个流,定义2个OutputTag标签 第2步、调用DataStream中process底层处理函数,进行判断,划分OutputTag。

案例演示:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

package cn.itqzd.flink.transformation;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.util.OutputTag;

/**

* Flink 流计算中转换算子:使用侧边流SideOutputs

*/

public class TransformationSideOutputsDemo {

public static void main(String[] args) throws Exception {

// 1. 执行环境-env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 2. 数据源-source

DataStreamSource inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// 3. 数据转换-transformation

/*

对数据流进行分割,使用sideOutput侧边输出算子实现,将奇数数字放在一个流,将偶数数字放在一个流,todo:原来数据流中数据平方处理

*/

// step1、定义分割流标签

OutputTag oddTag = new OutputTag("side-odd") {};

OutputTag evenTag = new OutputTag("side-even") {};

// step2、调用process函数,对流中数据处理及打标签

SingleOutputStreamOperator mainStream = inputStream.process(new ProcessFunction() {

// 表示对流中每条数据处理

@Override

public void processElement(Integer value, Context ctx, Collector out) throws Exception {

// todo: 流中每条数据原来该怎么计算依然如何计算,比如值平方

int squareValue = value * value ;

out.collect(squareValue + "");

// step3、判断数据是奇数还是偶数,打上对应标签

if(value % 2 == 0){

ctx.output(evenTag, value);

}else{

ctx.output(oddTag, value);

}

}

});

// 4. 数据终端-sink

mainStream.printToErr();

// step4、获取侧边流,依据标签

DataStream oddStream = mainStream.getSideOutput(oddTag);

oddStream.print("odd>");

DataStream evenStream = mainStream.getSideOutput(evenTag);

evenStream.print("even>");

// 5. 触发执行-execute

env.execute("TransformationSideOutputsDemo");

}

}

运行上述流式计算程序,可以发现,原来数据流继续处理数据,依据OutputTag衍生侧边流,各自单独处理数据。

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: