目录

前言

一、基本处理函数

        1.1处理函数的功能和使用

                1.1.1功能

                1.1.2 使用

        1.2 ProcessFunction解析

                1.2.1抽象方法 .processElement()

                1.2.2非抽象方法 .onTimer()     

                1.2.3处理函数的分类

                      (1)ProcessFunction

                      (2)KeyedProcessFunction

                      (3)ProcessWindowFunction

                      (4)ProcessAllWindowFunction

                      (5)CoProcessFunction

                      (6)ProcessJoinFunction

                      (7)BroadcastProcessFunction

                      (8)KeyedBroadcastProcessFunction

二、按键分区处理函数

        2.1定时器(Timer)和定时服务(TimerService)

        2.2 KeyedProcessFunction

三、窗口处理函数

前言

        无论是基本的转换、聚合,还是更为复杂的窗口操作,其实就是基于DataStream进行转换的,所以可以统称为DataStream API。

        在Flink更底层,我们可以不定义任何具体的算子(比如map,filter或window),而只是提炼出一个统一的“处理”(process)操作------它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

        

一、基本处理函数

        1.1处理函数的功能和使用

                1.1.1功能

                在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。这时就需要使用底层的处理函数。

                处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富有函数类的所有特征,同时可以访问状态(state)和其他运行时的信息。

                1.1.2 使用

                处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

        1.2 ProcessFunction解析

public abstract class ProcessFunction extends AbstractRichFunction {

...

public abstract void processElement(I value, Context ctx, Collector out) throws Exception;

public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {}

...

}

        在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。

        内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

                1.2.1抽象方法 .processElement()

                用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器out来定义的。

value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。ctx:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()out:“收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap算子中的收集器完全一样,直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

             通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。 

java:

public class ProcessExample {

public static void main(String[] args) throws Exception {

// 设置执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建数据源

DataStream> dataStream = env.fromElements(

new Tuple2<>("Alice", 1),

new Tuple2<>("Bob", 2),

new Tuple2<>("Alice", 3)

);

// 应用 processElement 函数

DataStream result = dataStream.process(new ProcessFunction, String>() {

@Override

public void processElement(Tuple2 value, ReadOnlyContext ctx, Collector out) throws Exception {

// 处理每个元素,这里只是简单地将元素的值转换为字符串并输出

out.collect(value.f0 + ": " + value.f1);

}

});

// 打印结果到控制台

result.print();

// 执行任务

env.execute("Process Element Example");

}

}

scala:

class MyKeyedProcessFunction extends KeyedProcessFunction[String, Tuple2[String, Int], Tuple3[String, String, String]] {

override def processElement(value: Tuple2[String, Int],

ctx: KeyedProcessFunction[String, Tuple2[String, Int], Tuple3[String, String, String]]#Context,

out: Collector[Tuple3[String, String, String]]): Unit = {

// 处理每个元素,这里只是简单地将元素的值翻倍并输出

val key = value.f0

val valueToProcess = value.f1

val processedValue = valueToProcess * 2

val output = Tuple3(key, "processed", processedValue.toString)

out.collect(output)

}

}

/**

在这个示例中,我们定义了一个名为 MyKeyedProcessFunction 的类,它继承自 KeyedProcessFunction。我们使用 Tuple2 类型作为输入元素,其中第一个字段是字符串类型的 key,第二个字段是整数值。输出使用 Tuple3 类型,其中第一个字段是 key,第二个字段是字符串 "processed",第三个字段是处理后的整数值。

在 processElement() 方法中,我们首先获取元素的 key 和值,然后对值进行翻倍处理。最后,我们将处理后的结果封装为 Tuple3 类型的输出,并通过 out.collect() 方法将其发送到下游操作。

请注意,这只是一个简单的示例,你可以根据自己的需求调整 processElement() 方法的逻辑来处理实际数据。

*/

                 1.2.2非抽象方法 .onTimer()     

                     用于处理定时器触发的事件。         

                  这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

                        定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

                        既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说ProcessFunction其实可以实现一切功能。

        注意:在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。

java:

public class OnTimerExample {

public static void main(String[] args) throws Exception {

// 设置执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream input1 = env.fromElements("Start", "Event1", "Event2");

DataStream input2 = env.fromElements("Timer", "Event3", "Event4");

DataStream> result = input1

.connect(input2)

.flatMap(new Tokenizer())

.keyBy(value -> value.f0)

.window(TumblingProcessingTimeWindows.of(TimeUnit.SECONDS.toMillis(5)))

.process(new RichCoFlatMapFunction, Tuple2, Tuple2>() {

private int count;

private ValueState countState;

@Override

public void open(Configuration parameters) throws Exception {

count = 0;

countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Integer.class));

}

@Override

public void flatMap1(Tuple2 value, Collector> out) throws Exception {

count++;

countState.update(count);

out.collect(value);

}

@Override

public void flatMap2(Tuple2 value, Collector> out) throws Exception {

// 处理 timer 事件,例如重置计数器或触发其他操作。

int currentCount = countState.value() == null ? 0 : countState.value();

if (currentCount >= 3) { // 当计数器达到3时触发timer事件。

out.collect(new Tuple2<>("TimerTriggered", currentCount)); // 输出触发信息。

countState.clear(); // 重置计数器。

} else {

out.collect(value); // 输出其他事件。

}

}

});

result.print(); // 输出结果。

env.execute("OnTimer Example"); // 执行任务。

}

}

scala:

// 定义一个简单的数据类

case class MyEvent(id: Int, value: String)

// 定义 KeyedProcessFunction

class MyKeyedProcessFunction extends KeyedProcessFunction[Int, MyEvent, Tuple3[Long, String, String]] {

// 定义一个状态来存储事件的时间戳

var timestampState: ValueState[Long] = _

override def open(parameters: Configuration): Unit = {

timestampState = getRuntimeContext().getState(new ValueStateDescriptor[Long]("timestamp", classOf[Long]))

}

override def processElement(value: MyEvent,

ctx: KeyedProcessFunction[Int, MyEvent, Tuple3[Long, String, String]]#Context,

out: Collector[Tuple3[Long, String, String]]): Unit = {

val currentTimestamp = System.currentTimeMillis()

// 更新状态中的时间戳

timestampState.update(currentTimestamp)

// 输出当前时间戳、事件的值和 key

out.collect(new Tuple3(currentTimestamp, value.value, value.id.toString))

}

override def onTimer(timestamp: Long,

ctx: KeyedProcessFunction[Int, MyEvent, Tuple3[Long, String, String]]#OnTimerContext,

out: Collector[Tuple3[Long, String, String]]): Unit = {

// 在这里处理定时器触发的事件,例如输出触发的时间和 key

out.collect(new Tuple3(timestamp, "timer", ctx.getCurrentKey().toString))

}

}

/**

在这个例子中,我们定义了一个 MyKeyedProcessFunction 类,它继承自 KeyedProcessFunction。我们覆盖了 processElement 方法来处理每个事件,并使用 ValueState 来存储每个 key 的时间戳。我们还覆盖了 onTimer 方法来处理定时器触发的事件。

在 onTimer 方法中,我们使用 ctx 参数来获取当前的时间戳和 key,并使用 out 参数来输出结果。你可以根据自己的需求调整这个例子,例如修改事件的时间间隔或处理逻辑。

*/

                1.2.3处理函数的分类

                        DataStream在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到KeyedStream,进而再调用.window()之后得到WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层API,可彼此之间也会有所差异。

                        Flink提供了8个不同的处理函数:

                        (1)ProcessFunction

                        最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。

                      (2)KeyedProcessFunction

                       对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

                        (3)ProcessWindowFunction

                     开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

                        (4)ProcessAllWindowFunction

                               同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

                        (5)CoProcessFunction

                                合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

                        (6)ProcessJoinFunction

                        间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

                        (7)BroadcastProcessFunction

                        广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

                        (8)KeyedBroadcastProcessFunction

                        按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。

二、按键分区处理函数

        只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

        2.1定时器(Timer)和定时服务(TimerService)

                定时服务与当前运行的环境有关。ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间

long currentProcessingTime();

// 获取当前的水位线(事件时间)

long currentWatermark();

// 注册处理时间定时器,当处理时间超过time时触发

void registerProcessingTimeTimer(long time);

// 注册事件时间定时器,当水位线超过time时触发

void registerEventTimeTimer(long time);

// 删除触发时间为time的处理时间定时器

void deleteProcessingTimeTimer(long time);

// 删除触发时间为time的处理时间定时器

void deleteEventTimeTimer(long time);

        六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。

        TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。

         2.2KeyedProcessFunction

 java:

public class MyKeyedProcessFunctionExample implements KeyedProcessFunction {

@Override

public void open(Configuration parameters) throws Exception {

// 初始化状态

ValueStateDescriptor countStateDescriptor = new ValueStateDescriptor<>(

"count", Integer.class);

countStateDescriptor.initializeSerializer(executionConfig);

ValueState countState = getRuntimeContext().getState(countStateDescriptor);

}

@Override

public void processElement(MyEvent value, ReadOnlyContext ctx, Collector out) throws Exception {

// 处理元素并更新状态

int currentCount = countState.value() == null ? 0 : countState.value();

countState.update(currentCount + 1);

out.collect(new MyResult(value.getKey(), currentCount));

}

@Override

public void processWatermark(Watermark mark, ReadOnlyContext ctx, Collector out) throws Exception {

// 处理 watermark,例如检查是否有延迟的事件需要处理

long time = mark.getTimestamp();

boolean isLate = false; // 假设这里检查是否有延迟的事件

if (isLate) {

// 处理延迟的事件

out.collect(new MyResult("late-event", 0)); // 示例输出,实际应用中可能会有不同的逻辑

}

}

}

/**

在这个示例中,我们创建了一个名为MyKeyedProcessFunctionExample的类,它实现了KeyedProcessFunction接口。我们通过覆盖open方法来初始化状态,使用ValueState来存储每个key的计数。

在processElement方法中,我们获取当前计数并更新状态,然后将结果输出。我们还覆盖了processWatermark方法来处理watermark,这里简单地假设检查是否有延迟的事件。请注意,这只是一个简单的示例,你可以根据自己的需求进行修改和扩展。

*/

scala:

class MyKeyedProcessFunction extends KeyedProcessFunction[String, MyEvent, MyResult] {

var countState: ValueState[Int] = _

override def open(parameters: Configuration): Unit = {

val countStateDescriptor = new ValueStateDescriptor[Int]("count", classOf[Int])

countState = getRuntimeContext.getState(countStateDescriptor)

}

override def processElement(value: MyEvent, ctx: KeyedProcessFunction[String, MyEvent, MyResult]#ReadOnlyContext, out: Collector[MyResult]): Unit = {

val currentCount = countState.value() match {

case Some(c) => c

case None => 0

}

countState.update(currentCount + 1)

out.collect(MyResult(value.key, currentCount))

}

override def processWatermark(mark: Watermark, ctx: KeyedProcessFunction[String, MyEvent, MyResult]#ReadOnlyContext, out: Collector[MyResult]): Unit = {

// 处理 watermark,例如检查是否有延迟的事件需要处理

val isLate = // 假设这里检查是否有延迟的事件

if (isLate) {

// 处理延迟的事件

out.collect(MyResult("late-event", 0)) // 示例输出,实际应用中可能会有不同的逻辑

}

}

}

/**在这个示例中,我们创建了一个名为MyKeyedProcessFunction的类,它继承了KeyedProcessFunction。我们通过覆盖open方法来初始化状态,使用ValueState来存储每个key的计数。在processElement方法中,我们获取当前计数并更新状态,然后将结果输出。

我们还覆盖了processWatermark方法来处理watermark,这里简单地假设检查是否有延迟的事件。请注意,这只是一个简单的示例,你可以根据自己的需求进行修改和扩展。*/

三、窗口处理函数

     除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction

        进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

        窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 )

        .window( TumblingEventTimeWindows.of(Time.seconds(10)) )

        .process(new MyProcessWindowFunction())

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: