Flink学习笔记

前言:今天是学习 flink 的第 13 天啦!学习了 flink 高级特性和新特性之ProcessFunction API 和 双流 join,主要是解决大数据领域数据从数据增量聚合的问题,以及快速变化中的流数据拉宽问题,即变化中多个数据源合并在一起的问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉,在我的博客里,不仅有知识的海洋,还有满满的正能量加持,快来和我一起分享这份快乐吧!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"

文章目录

Flink学习笔记四、Flink 高级特性和新特性2. Process Function API2.1 Process Function 分类2.2 KeyedProcessFunction [重点]2.3 具有增量聚合的 ProcessWindowFunction2.3.1 用法概述2.3.2 使用 ReduceFunction 进行增量窗口聚合2.3.3 使用 AggerateFunction 进行增量窗口聚合2.3.4 Using per-window state in ProcessWindowFunction

3. 双流 Join3.1 面试介绍3.2 Window Join3.2.1 Tumbling Window Join3.2.2 Sliding Window Join3.2.3 Session Window Join3.2.3 案例演示

3.3 Interval Join3.3.1 Interval Join 介绍3.3.2 案例演示

四、Flink 高级特性和新特性

2. Process Function API

之前的转换算子是无法访问时间戳信息和水位线信息的,但 Process Function 可以访问时间戳,水位线,以及注册定时时间等,Flink SQL 就是使用 Process Function 实现的。

2.1 Process Function 分类

1- ProcessFunction 用于 dataStream2- KeyedProcessFunction 用于 Keyed dataStream3- CoProcessFunction 用于 connect 连接的流4- ProcessJoinFunction 用于 join 流操作5- BroadcastProcessFunction 用于广播6- KeyedBroadcastProcessFunction 用于 keyed 后的广播7- ProcessWindowFunction 窗口增量聚合8- ProcessAllWindowFunction 全窗口聚合

2.2 KeyedProcessFunction [重点]

KeyedProcessFunction 作为 ProcessFunction 的扩展,在其 onTimer(…) 方法中提供对定时器对应key的访问。

所有的 Process Function 都继承自 RichFunction 接口,所以都有:

open()close()getRuntimeContext()

KeyedProcessFunction 额外提供了两个方法:

processElement,每个元素调用一次onTimer,回调函数,用于定时器

案例:在服务器运维中,需要实时监控服务器机架的温度,如果一定时间内温度超过了一定阈值(100度),且后一次上报的温度超过了前一次上报的温度,需要触发告警(温度持续升高中)

package cn.itcast.day12.process;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.state.ListState;

import org.apache.flink.api.common.state.ListStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.commons.collections.IteratorUtils;

import java.text.SimpleDateFormat;

/**

* @author lql

* @time 2024-03-08 13:01:05

* @description TODO:数据结构:(id,温度)

*/

public class SystemMonitorDemo {

public static void main(String[] args) throws Exception {

// todo 1) 初始化 flink 环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// todo 2) 指定并行度为 1

env.setParallelism(1);

// todo 3) 接入数据源

DataStreamSource socketTextStream = env.socketTextStream("node1", 9999);

// todo 4) 将获取的数据转化为 tuple

SingleOutputStreamOperator> tupleDataStream = socketTextStream.map(new MapFunction>() {

@Override

public Tuple2 map(String line) throws Exception {

String[] arrayData = line.split(",");

return Tuple2.of(Integer.parseInt(arrayData[0]), Integer.parseInt(arrayData[1]));

}

});

// todo 5) 分组操作

KeyedStream, Integer> tuple2TupleKeyedStream = tupleDataStream.keyBy(t -> t.f0);

// todo 6) 自定义ProcessFunction对象,继承 KeyedProcessFunction 抽象类

SingleOutputStreamOperator result = tuple2TupleKeyedStream.process(new MyKeyedProcessFunction());

// todo 7) 打印输出

result.printToErr();

// todo 8) 执行程序

env.execute();

}

private static class MyKeyedProcessFunction extends KeyedProcessFunction,String> {

// 定义数据存储对象

private ListState> listState = null;

// 定义时间对象

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

// 定义时间

private Long timeTS = 0L;

/**

* 初始化资源

* @param parameters

* @throws Exception

*/

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

// 实例化 state 对象

this.listState = getRuntimeContext().getListState(new ListStateDescriptor>(

"listState",

TypeInformation.of(new TypeHint>() {})

));

System.out.println("初始化state对象...");

}

@Override

public void close() throws Exception {

super.close();

}

/**

* 定时器触发方法

* @param timestamp

* @param ctx

* @param out

* @throws Exception

*/

@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

super.onTimer(timestamp, ctx, out);

System.out.println("触发了定时服务...");

// 迭代转化状态到列表中,然后计算个数

int stateSize = IteratorUtils.toList(this.listState.get().iterator()).size();

if(stateSize >= 2){

//返回数据,触发告警

out.collect("触发了告警");

}

//清空历史的状态数据

this.listState.clear();

}

/**

* 对数据集中的每条数据进行处理

* @param integerIntegerTuple2

* @param context

* @param collector

* @throws Exception

*/

@Override

public void processElement(Tuple2 integerIntegerTuple2, Context context, Collector collector) throws Exception {

//获取状态中存储的历史数据

Tuple2 lastData = null;

for (Tuple2 tuple : listState.get()){

lastData =tuple;

}

// 判断状态中的数据是否为空

if (lastData==null){

lastData = Tuple2.of(0,0);

}

System.out.println("状态中获取到的数据是:"+lastData);

if (integerIntegerTuple2.f1 > 100 & integerIntegerTuple2.f1 > lastData.f1){

System.out.println("温度上升中...注册定时器!");

//满足了温度大于100,且后一次的温度大于前一次的温度

//将当前的温度存储起来

listState.add(Tuple2.of(integerIntegerTuple2.f0,integerIntegerTuple2.f1));

//注册一个定时器(当前处理的时间+窗口长度=触发计算的时间)

timeTS = context.timerService().currentProcessingTime() + 10000L;

context.timerService().registerProcessingTimeTimer(timeTS);

}else{

if (integerIntegerTuple2.f1 < lastData.f1){

System.out.println("温度下降了...取消定时器!");

//取消定时器

context.timerService().deleteProcessingTimeTimer(timeTS);

}

if (integerIntegerTuple2.f1 < 100){

//清除状态存储的数据

listState.clear();

}

}

}

}

}

结果:

输入:

1,100

1,101

输出:

温度上升中...注册定时器!

触发了告警

2.3 具有增量聚合的 ProcessWindowFunction

在 reduce 和 aggregate 中均有可以和 processWindowFunction 结合实现增量聚合的方法(红角星标记)。

原理:对于一个窗口来说,先增量计算,关闭窗口前,增量计算结果发给 ProcessWindowFunction 作为输入再全量处理。

特点:既可以增量聚合,又可以访问窗口的元数据信息(比如开始时间、状态等)。

2.3.1 用法概述

input

.keyBy(...)

.timeWindow(...)

.reduce(

incrAggregator: ReduceFunction[IN],

function: ProcessWindowFunction[IN, OUT, K, W])

input

.keyBy(...)

.timeWindow(...)

.aggregate(

incrAggregator: AggregateFunction[IN, ACC, V],

windowFunction: ProcessWindowFunction[V, OUT, K, W])

2.3.2 使用 ReduceFunction 进行增量窗口聚合

数据:

{"userID": "user_1", "eventTime": "2020-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:33", "eventType": "browse", "productID": "product_1", "productPrice": 30}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:34", "eventType": "browse", "productID": "product_1", "productPrice": 20}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:36", "eventType": "browse", "productID": "product_1", "productPrice": 10}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:38", "eventType": "browse", "productID": "product_1", "productPrice": 70}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:40", "eventType": "browse", "productID": "product_1", "productPrice": 20}

例子:获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction),并获得Key和Window信息。

package cn.itcast.day12.process;

import com.alibaba.fastjson.JSON;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.commons.collections.IteratorUtils;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.util.Collector;

import org.joda.time.DateTime;

import java.text.SimpleDateFormat;

import java.time.Duration;

/**

* @author lql

* @time 2024-03-08 17:06:59

* @description TODO

*/

public class ReduceAndProcessFunction {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStreamSource lines = env.socketTextStream("node1", 9999);

// todo 3) 将获取的 json 数据解析成 java bean

lines.process(new SocketProcessFunction())

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)

.withTimestampAssigner(new SerializableTimestampAssigner() {

@Override

public long extractTimestamp(UserActionLog userActionLog, long l) {

try {

SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

return format.parse(userActionLog.getEventTime()).getTime();

} catch (Exception e) {

e.printStackTrace();

return 0L;

} }}))

// 按照用户分组

.keyBy( (KeySelector) UserActionLog::getUserID )

// 构造窗口函数 TimeWindow:滚动事件时间窗口

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

// 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录

.reduce(

//增量聚合操作

new ReduceFunction() {

@Override

public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {

return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;

}

},

//窗口函数操作,其中迭代器中的数据只有一条,已经进行了增量聚合

new ProcessWindowFunction() {

@Override

public void process(String key, Context context, Iterable elements, Collector out) throws Exception {

UserActionLog max = elements.iterator().next();

System.out.println("集合中的数据:"+ IteratorUtils.toList(elements.iterator()).size());

String windowStart = new DateTime(context.window().getStart()).toString("yyyy-MM-dd HH:mm:ss");

String windowEnd = new DateTime(context.window().getEnd()).toString("yyyy-MM-dd HH:mm:ss");

String record = "key:"+key+"\n"+"窗口开始时间:"+windowStart+"\n窗口结束时间:"+windowEnd+"\n浏览的商品最大价值对应的记录:"+max;

out.collect(record);

}

}

).print();

// todo 4) 启动程序

env.execute();

}

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class UserActionLog{

private String userID;

private String eventTime;

private String eventType;

private String productID;

private Long productPrice;

}

/**

* 将获取的JSON数据解析成Java Bean

*/

private static class SocketProcessFunction extends ProcessFunction{

/**

* 每条数据都需要执行的方法

* @param s

* @param context

* @param collector

* @throws Exception

*/

@Override

public void processElement(String s, Context context, Collector collector) throws Exception {

collector.collect(JSON.parseObject( s, UserActionLog.class ));

}

}

}

结果:

集合中的数据:1

key:user_1

窗口开始时间:2020-11-09 10:41:30

窗口结束时间:2020-11-09 10:41:35

浏览的商品最大价值对应的记录:ReduceAndProcessFunction.UserActionLog(userID=user_1, eventTime=2020-11-09 10:41:33, eventType=browse, productID=product_1, productPrice=30)

集合中的数据:1

key:user_1

窗口开始时间:2020-11-09 10:41:35

窗口结束时间:2020-11-09 10:41:40

浏览的商品最大价值对应的记录:ReduceAndProcessFunction.UserActionLog(userID=user_1, eventTime=2020-11-09 10:41:38, eventType=browse, productID=product_1, productPrice=70)

总结:

1- 需要先设置并行度为1,便于少量数据观察到结果2- reduce/aggregate 暂时不需要 RichreduceFunction,报错:ReduceFunction of apply can not be a RichFunction.

2.3.3 使用 AggerateFunction 进行增量窗口聚合

数据:

{"userID": "user_1", "eventTime": "2020-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:33", "eventType": "browse", "productID": "product_1", "productPrice": 30}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:34", "eventType": "browse", "productID": "product_1", "productPrice": 20}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:36", "eventType": "browse", "productID": "product_1", "productPrice": 10}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:38", "eventType": "browse", "productID": "product_1", "productPrice": 70}

{"userID": "user_1", "eventTime": "2020-11-09 10:41:40", "eventType": "browse", "productID": "product_1", "productPrice": 20}

例子:获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction),并获得Key和Window信息。

package cn.itcast.day12.process;

import com.alibaba.fastjson.JSON;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.commons.collections.IteratorUtils;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.AggregateFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.util.Collector;

import org.joda.time.DateTime;

import org.joda.time.DateTimeZone;

import java.text.SimpleDateFormat;

import java.time.Duration;

/**

* @author lql

* @time 2024-03-08 17:59:42

* @description TODO

*/

public class AggregateAndProcessFunction {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStreamSource lines = env.socketTextStream("node1", 9999);

// 将从Kafka获取的JSON数据解析成Java Bean

lines.process(new KafkaProcessFunction())

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)

.withTimestampAssigner(

new SerializableTimestampAssigner() {

@Override

public long extractTimestamp(UserActionLog element, long recordTimestamp) {

try {

SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

return format.parse(element.getEventTime()).getTime();

} catch (Exception e) {

e.printStackTrace();

return 0L;

}

}

}))

// 按用户分组

.keyBy((KeySelector) UserActionLog::getUserID)

// 构造TimeWindow

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

// 窗口函数: 获取这段窗口时间内,每个用户浏览的商品的平均价值,并发出Key和Window信息

.aggregate(

new AggregateFunction, Double>() {

// 1、初始值

// 定义累加器初始值

@Override

public Tuple2 createAccumulator() {

return new Tuple2<>(0L, 0L);

}

// 2、累加

// 定义累加器如何基于输入数据进行累加

@Override

public Tuple2 add(UserActionLog value, Tuple2 accumulator) {

accumulator.f0 += 1;

accumulator.f1 += value.getProductPrice();

return accumulator;

}

// 3、合并

// 定义累加器如何和State中的累加器进行合并

@Override

public Tuple2 merge(Tuple2 acc1, Tuple2 acc2) {

acc1.f0 += acc2.f0;

acc1.f1 += acc2.f1;

return acc1;

}

@Override

public Double getResult(Tuple2 longLongTuple2) {

return longLongTuple2.f1 / (longLongTuple2.f0 * 1.0);

}

},

new ProcessWindowFunction() {

@Override

public void process(String key, Context context, Iterable elements, Collector out) throws Exception {

Double avg = elements.iterator().next();

String windowStart = new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg);

out.collect(record);

}

}

).print();

env.execute();

}

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class UserActionLog{

private String userID;

private String eventTime;

private String eventType;

private String productID;

private Long productPrice;

}

/**

* 将从Kafka获取的JSON数据解析成Java Bean

*/

private static class KafkaProcessFunction extends ProcessFunction {

@Override

public void processElement(String value, Context ctx, Collector out) throws Exception {

out.collect(JSON.parseObject(value, UserActionLog.class));

}

}

}

结果:

Key: user_1 窗口开始时间: 2020-11-09 10:41:30 窗口结束时间: 2020-11-09 10:41:35 浏览的商品的平均价值: 20.00

Key: user_1 窗口开始时间: 2020-11-09 10:41:35 窗口结束时间: 2020-11-09 10:41:40 浏览的商品的平均价值: 40.00

总结:

这种方法主要以 aggregate 的累加器思路为重点,processWindowFunction 的方法主要是为了更能输出状态数据等信息。

2.3.4 Using per-window state in ProcessWindowFunction

与 windowFunction 不同,使用 ProcessWindowFunction 不仅仅可以拿到窗口内数据信息,还可以获取两个状态:

WindowState:表示窗口的状态,该状态值和窗口绑定的,一旦窗口消亡状态消失。GlobalState:表示窗口的状态,该状态和Key绑定的,可以累计多个窗口的值。

数据:

1000,spark,2

5000,spark,2

6000,spark,3

10000,spark,5

例子:

package cn.itcast.day12.process;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.AggregateFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.common.state.ReducingState;

import org.apache.flink.api.common.state.ReducingStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.datastream.WindowedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.util.Collector;

import java.time.Duration;

import java.util.Iterator;

/**

* @author lql

* @time 2024-03-08 20:36:12

* @description TODO

*/

public class WindowStateAndGlobalStateFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(10000);

//1000,spark,3

//1200,spark,5

//2000,hadoop,2

//socketTextStream返回的DataStream并行度为1

DataStreamSource lines = env.socketTextStream("node1", 9999);

SingleOutputStreamOperator dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ZERO)

.withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));

SingleOutputStreamOperator> wordAndCount = dataWithWaterMark.map(new MapFunction>() {

@Override

public Tuple2 map(String value) throws Exception {

String[] fields = value.split(",");

return Tuple2.of(fields[1], Integer.parseInt(fields[2]));

}

});

//调用keyBy

KeyedStream, String> keyed = wordAndCount.keyBy(t -> t.f0);

//NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger

// Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssinger

WindowedStream, String, TimeWindow> windowed = keyed

.window(TumblingEventTimeWindows.of(Time.seconds(5)));

//如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加

//需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合

SingleOutputStreamOperator result = windowed.aggregate(new MyAggFunc(), new MyWindowFunc());

result.print();

env.execute();

}

private static class MyAggFunc implements AggregateFunction, Integer, Integer> {

//创建一个初始值

@Override

public Integer createAccumulator() {

return 0;

}

//数据一条数据,与初始值或中间累加的结果进行聚合

@Override

public Integer add(Tuple2 value, Integer accumulator) {

return value.f1 + accumulator;

}

//返回的结果

@Override

public Integer getResult(Integer accumulator) {

return accumulator;

}

//如果使用的是非SessionWindow,可以不实现

@Override

public Integer merge(Integer a, Integer b) {

return null;

}

}

private static class MyWindowFunc extends ProcessWindowFunction {

// 一个是窗口描述器,一个是全局描述器

private transient ReducingStateDescriptor windowStateDescriptor;

private transient ReducingStateDescriptor globalStateDescriptor;

@Override

public void open(Configuration parameters) throws Exception {

windowStateDescriptor = new ReducingStateDescriptor(

"window",

new ReduceFunction() {

@Override

public Integer reduce(Integer value1, Integer value2) throws Exception {

return value1 + value2;

}

}, TypeInformation.of(new TypeHint() { }));

globalStateDescriptor = new ReducingStateDescriptor(

"global",

new ReduceFunction() {

@Override

public Integer reduce(Integer value1, Integer value2) throws Exception {

return value1 + value2;

}

}, TypeInformation.of(new TypeHint() { }));

}

@Override

public void process(String key, Context context, Iterable elements, Collector out) throws Exception {

Integer sum = 0;

Iterator iterator = elements.iterator();

while (iterator.hasNext()){

sum += iterator.next();

}

ReducingState windowState = context.windowState().getReducingState(windowStateDescriptor);

ReducingState globalState = context.globalState().getReducingState(globalStateDescriptor);

// lambda 表达式的遍历,每个元素 t

elements.forEach(t -> {

try {

windowState.add(t);

globalState.add(t);

} catch (Exception exception) {

exception.printStackTrace();

}

});

out.collect(key+",window:"+windowState.get()+",global:"+globalState.get());

}

}

}

结果:

1> spark,window:2,global:2

1> spark,window:5,global:7

总结:

1- 注册两个状态描述器之后,需要重写 open 方法;2- 在 open 方法中,都需要 new 一个 ReducingStateDescriptor,然后重写 reduce 方法进行累加操作;3- 在 process 方法中,进行元素的迭代求和;4- 极为主要的是,运用 context.windowState() / globalState(),这个是主要区别!

3. 双流 Join

3.1 面试介绍

Join大体分类只有两种:Window Join 和 Interval Join

Window Join 将数据缓存在 Window State 中,窗口触发计算时执行join操作

Tumbling Window JoinSliding Window JoinSession Widnow Join。 interval join

也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理

3.2 Window Join

3.2.1 Tumbling Window Join

执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给 JoinFunction 或 FlatJoinFunction。

注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

使用模板:

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream orangeStream = ...

DataStream greenStream = ...

orangeStream.join(greenStream)

.where()

.equalTo()

.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))

.apply (new JoinFunction (){

@Override

public String join(Integer first, Integer second) {

return first + "," + second;

}

});

3.2.2 Sliding Window Join

在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给 JoinFunction 或 FlatJoinFunction。

注意,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。

使用模板:

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream orangeStream = ...DataStream greenStream = ...

orangeStream.join(greenStream)

.where()

.equalTo()

.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))

.apply (new JoinFunction (){

@Override

public String join(Integer first, Integer second) {

return first + "," + second;

}

});

3.2.3 Session Window Join

在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。

注意,在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

使用模板:

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream orangeStream = ...DataStream greenStream = ...

orangeStream.join(greenStream)

.where()

.equalTo()

.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))

.apply (new JoinFunction (){

@Override

public String join(Integer first, Integer second) {

return first + "," + second;

}

});

3.2.3 案例演示

例子:使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。

package cn.itcast.day13.join;

/**

* @author lql

* @time 2024-03-09 21:03:00

* @description TODO:

思路

Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。

设置5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。

apply方法中实现将两个不同类型的元素关联并生成一个新类型的元素。

*/

import com.alibaba.fastjson.JSON;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.eventtime.*;

import org.apache.flink.api.common.functions.JoinFunction;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import java.math.BigDecimal;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.UUID;

import java.util.concurrent.TimeUnit;

/**

* 来做个案例:

* 使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。

*/

public class JoinDemo01 {

public static void main(String[] args) throws Exception {

//todo 1)环境初始化

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//todo 2)设置并行度

env.setParallelism(1);

//todo 3)构建数据源

//构建商品数据流

// 因为继承的 Richsource 没有指出返回类型,所以这里需要指出了!!!

SingleOutputStreamOperator goodsDataStream = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class))

.assignTimestampsAndWatermarks(new GoodsWatermark());

//构建订单明细数据流

SingleOutputStreamOperator orderItemDataStream = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderDetailWatermark());

DataStream result = goodsDataStream.join(orderItemDataStream)

//第一个流的where

.where(Goods::getGoodsId)

//第二个流的where

.equalTo(OrderItem::getGoodsId)

//添加窗口

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.apply(new JoinFunction() {

@Override

public FactOrderItem join(Goods goods, OrderItem orderItem) throws Exception {

FactOrderItem factOrderItem = new FactOrderItem();

factOrderItem.setGoodsId(goods.getGoodsId());

factOrderItem.setGoodsName(goods.getGoodsName());

factOrderItem.setCount(new BigDecimal(orderItem.getCount()));

factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(orderItem.getCount())));

return factOrderItem;

}

});

result.printToErr();

env.execute();

}

//商品类

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class Goods {

private String goodsId;

private String goodsName;

private BigDecimal goodsPrice;

public static List GOODS_LIST;

public static Random r;

static {

r = new Random();

GOODS_LIST = new ArrayList<>();

GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));

GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));

GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));

GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));

GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));

GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));

}

public static Goods randomGoods() {

int rIndex = r.nextInt(GOODS_LIST.size());

return GOODS_LIST.get(rIndex);

}

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

//订单明细类

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class OrderItem {

private String itemId;

private String goodsId;

private Integer count;

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

//关联结果

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class FactOrderItem {

private String goodsId;

private String goodsName;

private BigDecimal count;

private BigDecimal totalMoney;

private String itemId;

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

//构建一个商品Stream源(这个好比就是维表)

public static class GoodsSource11 extends RichSourceFunction {

private Boolean isCancel;

@Override

public void open(Configuration parameters) throws Exception {

isCancel = false;

}

@Override

public void run(SourceContext sourceContext) throws Exception {

while(!isCancel) {

// steam 可以将列表转化为流

// lambda 表达式将返回对象逐个进行 collect

Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));

TimeUnit.SECONDS.sleep(1);

}

}

@Override

public void cancel() {

isCancel = true;

}

}

//构建订单明细Stream源

public static class OrderItemSource extends RichSourceFunction {

private Boolean isCancel;

private Random r;

@Override

public void open(Configuration parameters) throws Exception {

isCancel = false;

r = new Random();

}

@Override

public void run(SourceContext sourceContext) throws Exception {

while(!isCancel) {

Goods goods = Goods.randomGoods();

OrderItem orderItem = new OrderItem();

orderItem.setGoodsId(goods.getGoodsId());

orderItem.setCount(r.nextInt(10) + 1);

orderItem.setItemId(UUID.randomUUID().toString());

sourceContext.collect(orderItem);

orderItem.setGoodsId("111");

sourceContext.collect(orderItem);

TimeUnit.SECONDS.sleep(1);

}

}

@Override

public void cancel() {

isCancel = true;

}

}

// 因为这里没有指定是哪一种水印,重写两个方法!

/**

* 定义商品水印信息

*/

private static class GoodsWatermark implements WatermarkStrategy {

// 因为这里看见水印生成器,所以一定要想到有继承方法,参考自定义水印章节

@Override

public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator(){ // 继承两个方法

@Override

public void onEvent(Goods goods, long eventTimestamp, WatermarkOutput watermarkOutput) {

System.out.println("商品数据时间:"+System.currentTimeMillis());

watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));

}

@Override

public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));

}

};

}

@Override

public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {

// 在流处理过程中,每个 Goods 元素都将被分配一个当前的时间戳

return (element, recordTimestamp) -> System.currentTimeMillis();

}

}

/**

* 定义订单明细数据流的水印

*/

public static class OrderDetailWatermark implements WatermarkStrategy{

@Override

public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

@Override

public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {

System.out.println("订单明细数据时间:"+System.currentTimeMillis());

output.emitWatermark(new Watermark(System.currentTimeMillis()));

}

@Override

public void onPeriodicEmit(WatermarkOutput output) {

output.emitWatermark(new Watermark(System.currentTimeMillis()));

}

};

}

@Override

public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (element, recordTimestamp) -> System.currentTimeMillis();

}

}

}

结果:

订单明细数据时间:1709991872660

商品数据时间:1709991872660

订单明细数据时间:1709991872661

商品数据时间:1709991872661

商品数据时间:1709991872661

商品数据时间:1709991872661

商品数据时间:1709991872661

商品数据时间:1709991872661

订单明细数据时间:1709991873665

商品数据时间:1709991873665

订单明细数据时间:1709991873665

商品数据时间:1709991873665

商品数据时间:1709991873665

商品数据时间:1709991873665

商品数据时间:1709991873665

商品数据时间:1709991873665

订单明细数据时间:1709991874665

商品数据时间:1709991874665

商品数据时间:1709991874665

商品数据时间:1709991874665

商品数据时间:1709991874665

商品数据时间:1709991874665

商品数据时间:1709991874665

订单明细数据时间:1709991874665

{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}

{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}

{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}

{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}

{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}

{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}

{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}

{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}

{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}

总结:

1- 注意定义 java bean 类处理流信息的时候2- 窗口流注意水印操作的生成器方式,发生水印的时间3- joinFunction 需要重写 join 方法

3.3 Interval Join

3.3.1 Interval Join 介绍

interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界(负号),且,流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界(正号)。

这些边界是包含的,但是可以应用 .lowerBoundExclusive() 和 .upperBoundExclusive 来更改行为!

使用模板:

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;

import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream orangeStream = ...DataStream greenStream = ...

orangeStream

.keyBy()

.intervalJoin(greenStream.keyBy())

.between(Time.milliseconds(-2), Time.milliseconds(1))

.process (new ProcessJoinFunction

@Override

public void processElement(Integer left, Integer right, Context ctx, Collector out) {

out.collect(first + "," + second);

}

});

3.3.2 案例演示

例子:

package cn.itcast.day13.join;

import com.alibaba.fastjson.JSON;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.eventtime.*;

import org.apache.flink.api.common.functions.JoinFunction;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

import java.math.BigDecimal;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.UUID;

import java.util.concurrent.TimeUnit;

/**

* @author lql

* @time 2024-03-09 22:27:18

* @description TODO

*/

public class JoinDemo02 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 构建商品数据流

DataStream goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());

// 构建订单明细数据流

DataStream orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());

// 进行关联查询

SingleOutputStreamOperator factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())

.intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))

.between(Time.seconds(-1), Time.seconds(0))

.upperBoundExclusive()

.process(new ProcessJoinFunction() {

@Override

public void processElement(OrderItem left, Goods right, Context ctx, Collector out) throws Exception {

FactOrderItem factOrderItem = new FactOrderItem();

factOrderItem.setGoodsId(right.getGoodsId());

factOrderItem.setGoodsName(right.getGoodsName());

factOrderItem.setCount(new BigDecimal(left.getCount()));

factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));

out.collect(factOrderItem);

}

});

factOrderItemDS.print();

env.execute("Interval JOIN");

}

//商品类

@Data

public static class Goods {

private String goodsId;

private String goodsName;

private BigDecimal goodsPrice;

public static List GOODS_LIST;

public static Random r;

static {

r = new Random();

GOODS_LIST = new ArrayList<>();

GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));

GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));

GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));

GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));

GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));

GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));

}

public static Goods randomGoods() {

int rIndex = r.nextInt(GOODS_LIST.size());

return GOODS_LIST.get(rIndex);

}

public Goods() {

}

public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {

this.goodsId = goodsId;

this.goodsName = goodsName;

this.goodsPrice = goodsPrice;

}

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

//订单明细类

@Data

public static class OrderItem {

private String itemId;

private String goodsId;

private Integer count;

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

//关联结果

@Data

public static class FactOrderItem {

private String goodsId;

private String goodsName;

private BigDecimal count;

private BigDecimal totalMoney;

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

//构建一个商品Stream源(这个好比就是维表)

public static class GoodsSource11 extends RichSourceFunction {

private Boolean isCancel;

@Override

public void open(Configuration parameters) throws Exception {

isCancel = false;

}

@Override

public void run(SourceContext sourceContext) throws Exception {

while (!isCancel) {

Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));

TimeUnit.SECONDS.sleep(1);

}

}

@Override

public void cancel() {

isCancel = true;

}

}

//构建订单明细Stream源

public static class OrderItemSource extends RichSourceFunction {

private Boolean isCancel;

private Random r;

@Override

public void open(Configuration parameters) throws Exception {

isCancel = false;

r = new Random();

}

@Override

public void run(SourceContext sourceContext) throws Exception {

while (!isCancel) {

Goods goods = Goods.randomGoods();

OrderItem orderItem = new OrderItem();

orderItem.setGoodsId(goods.getGoodsId());

orderItem.setCount(r.nextInt(10) + 1);

orderItem.setItemId(UUID.randomUUID().toString());

sourceContext.collect(orderItem);

orderItem.setGoodsId("111");

sourceContext.collect(orderItem);

TimeUnit.SECONDS.sleep(1);

}

}

@Override

public void cancel() {

isCancel = true;

}

}

//构建水印分配器(此处为了简单),直接使用系统时间了

public static class GoodsWatermark implements WatermarkStrategy {

@Override

public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (element, recordTimestamp) -> System.currentTimeMillis();

}

@Override

public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

@Override

public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {

output.emitWatermark(new Watermark(System.currentTimeMillis()));

}

@Override

public void onPeriodicEmit(WatermarkOutput output) {

output.emitWatermark(new Watermark(System.currentTimeMillis()));

}

};

}

}

public static class OrderItemWatermark implements WatermarkStrategy {

@Override

public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (element, recordTimestamp) -> System.currentTimeMillis();

}

@Override

public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

@Override

public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {

output.emitWatermark(new Watermark(System.currentTimeMillis()));

}

@Override

public void onPeriodicEmit(WatermarkOutput output) {

output.emitWatermark(new Watermark(System.currentTimeMillis()));

}

};

}

}

}

结果:

5> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}

3> {"count":9,"goodsId":"3","goodsName":"MacBookPro","totalMoney":135000}

总结:

1- connect + broadcast 连接适用于数据几乎不变的情况下2- BroadcastState 连接适用于数据变化不那么快的情况下3- 双流 Join 连接适用于流式数据变化很快的情况下(类似于股价)

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: