Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引

文章目录

Flink 系列文章一、Flink测试概述二、测试用户自定义函数1、单元测试无状态、无时间限制的 UDF1)、示例-mapFunction2)、示例-flatMapFunction

2、对有状态或及时 UDF 和自定义算子进行单元测试1)、DataStream API 测试依赖2)、Table API 测试依赖3)、flatmap function 单元测试4)、Process Function 单元测试

三、测试 Flink 作业1、JUnit 规则 MiniClusterWithClientResource

本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、Flink测试概述

Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。 本文示例的maven依赖

UTF-8

UTF-8

1.8

1.8

1.8

2.12

1.17.0

org.apache.flink

flink-clients

${flink.version}

provided

org.apache.flink

flink-java

${flink.version}

provided

org.apache.flink

flink-streaming-java

${flink.version}

provided

org.apache.flink

flink-csv

${flink.version}

provided

org.apache.flink

flink-json

${flink.version}

provided

junit

junit

4.13

org.mockito

mockito-core

4.0.0

test

二、测试用户自定义函数

可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

1、单元测试无状态、无时间限制的 UDF

1)、示例-mapFunction

以下无状态的 MapFunction 为例

public class IncrementMapFunction implements MapFunction {

@Override

public Long map(Long record) throws Exception {

return record + 1;

}

}

通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

import static org.junit.Assert.assertEquals;

import org.apache.flink.api.common.functions.MapFunction;

import org.junit.Test;

/**

* @author alanchan

*

*/

public class TestDemo {

public class IncrementMapFunction implements MapFunction {

@Override

public Long map(Long record) throws Exception {

return record + 1;

}

}

@Test

public void testIncrement() throws Exception {

IncrementMapFunction incrementer = new IncrementMapFunction();

assertEquals((Long) 3L, incrementer.map(2L));

}

}

2)、示例-flatMapFunction

对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

import static org.mockito.Mockito.mock;

import static org.mockito.Mockito.times;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.util.Collector;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.mockito.Mockito;

import org.mockito.junit.MockitoJUnitRunner;

/**

* @author alanchan

*

*/

@RunWith(MockitoJUnitRunner.class)

public class TestDemo2 {

public static class IncrementFlatMapFunction implements FlatMapFunction {

@Override

public void flatMap(String value, Collector out) throws Exception {

Long sum = 0L;

for (String num : value.split(",")) {

sum += Long.valueOf(num);

}

out.collect(sum);

}

}

@Test

public void testSum() throws Exception {

IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();

Collector collector = mock(Collector.class);

incrementer.flatMap("1,2,3,4,5", collector);

Mockito.verify(collector, times(1)).collect(15L);

}

}

2、对有状态或及时 UDF 和自定义算子进行单元测试

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

1)、DataStream API 测试依赖

如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

org.apache.flink

flink-test-utils

1.17.2

test

在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

2)、Table API 测试依赖

如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

org.apache.flink

flink-table-test-utils

1.17.2

test

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

3)、flatmap function 单元测试

现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

示例如下

/*

* @Author: alanchan

* @LastEditors: alanchan

* @Description: 单元测试flatmap,如果是偶数则存储原值及平方数

*/

import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.streaming.api.operators.StreamFlatMap;

import org.apache.flink.streaming.api.watermark.Watermark;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.apache.flink.streaming.util.TestHarnessUtil;

import org.apache.flink.util.Collector;

import org.junit.Before;

import org.junit.Test;

public class TestStatefulFlatMapDemo3 {

static class AlanFlatMapFunction implements FlatMapFunction {

@Override

public void flatMap(Integer value, Collector out) throws Exception {

if (value % 2 == 0) {

out.collect(value);

out.collect(value * value);

}

}

}

OneInputStreamOperatorTestHarness testHarness;

@Before

public void setupTestHarness() throws Exception {

StreamFlatMap operator = new StreamFlatMap(new AlanFlatMapFunction());

testHarness = new OneInputStreamOperatorTestHarness(operator);

testHarness.open();

}

@Test

public void testFlatMap2() throws Exception {

long initialTime = 0L;

ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();

testHarness.processElement(new StreamRecord(1, initialTime + 1));

testHarness.processElement(new StreamRecord(2, initialTime + 2));

testHarness.processWatermark(new Watermark(initialTime + 2));

testHarness.processElement(new StreamRecord(3, initialTime + 3));

testHarness.processElement(new StreamRecord(4, initialTime + 4));

testHarness.processElement(new StreamRecord(5, initialTime + 5));

testHarness.processElement(new StreamRecord(6, initialTime + 6));

testHarness.processElement(new StreamRecord(7, initialTime + 7));

testHarness.processElement(new StreamRecord(8, initialTime + 8));

expectedOutput.add(new StreamRecord(2, initialTime + 2));

expectedOutput.add(new StreamRecord(4, initialTime + 2));

expectedOutput.add(new Watermark(initialTime + 2));

expectedOutput.add(new StreamRecord(4, initialTime + 4));

expectedOutput.add(new StreamRecord(16, initialTime + 4));

expectedOutput.add(new StreamRecord(6, initialTime + 6));

expectedOutput.add(new StreamRecord(36, initialTime + 6));

expectedOutput.add(new StreamRecord(8, initialTime + 8));

expectedOutput.add(new StreamRecord(64, initialTime + 8));

TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());

}

}

KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

示例如下

/*

* @Author: alanchan

* @LastEditors: alanchan

* @Description: 按照城市分类,并将城市缩写变成大写

*/

import com.google.common.collect.Lists;

import org.apache.flink.api.common.functions.RichFlatMapFunction;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.operators.StreamFlatMap;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;

import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.apache.flink.util.Collector;

import org.junit.Assert;

import org.junit.Before;

import org.junit.Test;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

public class TestStatefulFlatMapDemo2 {

@Data

@NoArgsConstructor

@AllArgsConstructor

static class User {

private int id;

private String name;

private int age;

private String city;

}

static class AlanFlatMapFunction extends RichFlatMapFunction {

// The state is only accessible by functions applied on a {@code KeyedStream}

ValueState previousInput;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

previousInput = getRuntimeContext()

.getState(new ValueStateDescriptor("previousInput", User.class));

}

@Override

public void flatMap(User input, Collector out) throws Exception {

previousInput.update(input);

input.setCity(input.getCity().toUpperCase());

out.collect(input);

}

}

AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();

OneInputStreamOperatorTestHarness testHarness;

@Before

public void setupTestHarness() throws Exception {

alanFlatMapFunction = new AlanFlatMapFunction();

testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),

new KeySelector() {

@Override

public String getKey(User value) throws Exception {

return value.getCity();

}

}, Types.STRING);

testHarness.open();

}

@Test

public void testFlatMap() throws Exception {

testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);

ValueState previousInput = alanFlatMapFunction.getRuntimeContext().getState(

new ValueStateDescriptor<>("previousInput", User.class));

User stateValue = previousInput.value();

Assert.assertEquals(

Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),

testHarness.extractOutputStreamRecords());

Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);

testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);

Assert.assertEquals(

Lists.newArrayList(

new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),

new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),

testHarness.extractOutputStreamRecords());

Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());

}

}

4)、Process Function 单元测试

除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

OneInputStreamOperatorTestHarness示例

import com.google.common.collect.Lists;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.operators.KeyedProcessOperator;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;

import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.apache.flink.util.Collector;

import org.junit.Assert;

import org.junit.Before;

import org.junit.Test;

/*

* @Author: alanchan

* @LastEditors: alanchan

* @Description:

*/

public class TestProcessOperatorDemo1 {

// public abstract class KeyedProcessFunction

static class AlanProcessFunction extends KeyedProcessFunction {

@Override

public void processElement(String value, KeyedProcessFunction.Context ctx,

Collector out) throws Exception {

ctx.timerService().registerProcessingTimeTimer(50);

out.collect("vx->" + value);

}

@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

// 到达时间点触发事件操作

out.collect(String.format("定时器在 %d 被触发", timestamp));

}

}

private OneInputStreamOperatorTestHarness testHarness;

private AlanProcessFunction processFunction;

@Before

public void setupTestHarness() throws Exception {

processFunction = new AlanProcessFunction();

testHarness = new KeyedOneInputStreamOperatorTestHarness<>(

new KeyedProcessOperator<>(processFunction),

x -> "1",

Types.STRING);

// Function time is initialized to 0

testHarness.open();

}

@Test

public void testProcessElement() throws Exception {

testHarness.processElement("alanchanchn", 10);

Assert.assertEquals(

Lists.newArrayList(

new StreamRecord<>("vx->alanchanchn", 10)),

testHarness.extractOutputStreamRecords());

}

@Test

public void testOnTimer() throws Exception {

// test first record

testHarness.processElement("alanchanchn", 10);

Assert.assertEquals(1, testHarness.numProcessingTimeTimers());

// Function time 设置为 100

testHarness.setProcessingTime(100);

Assert.assertEquals(

Lists.newArrayList(

new StreamRecord<>("vx->alanchanchn", 10),

new StreamRecord<>("定时器在 100 被触发")),

testHarness.extractOutputStreamRecords());

}

}

ProcessFunctionTestHarnesses示例

本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。

import java.util.Arrays;

import java.util.Collections;

import org.apache.flink.api.common.state.MapStateDescriptor;

import org.apache.flink.api.common.state.ReadOnlyBroadcastState;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;

import org.apache.flink.streaming.api.functions.co.CoProcessFunction;

import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;

import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;

import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;

import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;

import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;

import org.apache.flink.util.Collector;

import org.junit.Assert;

import org.junit.Test;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

/*

* @Author: alanchan

*

* @LastEditors: alanchan

*

* @Description:

*/

public class TestProcessOperatorDemo3 {

@Data

@NoArgsConstructor

@AllArgsConstructor

static class User {

private int id;

private String name;

private int age;

private String city;

}

// 测试ProcessFunction 的 processElement

@Test

public void testProcessFunction() throws Exception {

// public abstract class ProcessFunction

ProcessFunction function = new ProcessFunction() {

@Override

public void processElement(

String value, Context ctx, Collector out) throws Exception {

out.collect("vx->" + value);

}

};

OneInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses

.forProcessFunction(function);

harness.processElement("alanchanchn", 10);

Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));

}

// 测试KeyedProcessFunction 的 processElement

@Test

public void testKeyedProcessFunction() throws Exception {

// public abstract class KeyedProcessFunction

KeyedProcessFunction function = new KeyedProcessFunction() {

@Override

public void processElement(String value, KeyedProcessFunction.Context ctx,

Collector out) throws Exception {

out.collect("vx->" + value);

}

};

OneInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses

.forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);

harness.processElement("alanchan", 10);

Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));

}

// 测试CoProcessFunction 的 processElement1、processElement2

@Test

public void testCoProcessFunction() throws Exception {

// public abstract class CoProcessFunction

CoProcessFunction function = new CoProcessFunction() {

@Override

public void processElement1(String value, CoProcessFunction.Context ctx,

Collector out) throws Exception {

String[] userStr = value.split(",");

out.collect(

new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));

}

@Override

public void processElement2(User value, CoProcessFunction.Context ctx,

Collector out) throws Exception {

out.collect(value);

}

};

TwoInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses

.forCoProcessFunction(function);

harness.processElement2(new User(2, "alan", 19, "bj"), 100);

harness.processElement1("1,alanchan,18,sh", 10);

Assert.assertEquals(harness.extractOutputValues(),

Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));

}

// 测试KeyedCoProcessFunction 的 processElement1和processElement2

@Test

public void testKeyedCoProcessFunction() throws Exception {

// public abstract class KeyedCoProcessFunction

KeyedCoProcessFunction function = new KeyedCoProcessFunction() {

@Override

public void processElement1(String value, KeyedCoProcessFunction.Context ctx,

Collector out) throws Exception {

String[] userStr = value.split(",");

out.collect(

new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));

}

@Override

public void processElement2(User value, KeyedCoProcessFunction.Context ctx,

Collector out) throws Exception {

out.collect(value);

}

};

// public static

// KeyedTwoInputStreamOperatorTestHarness

// forKeyedCoProcessFunction(

// KeyedCoProcessFunction function,

// KeySelector keySelector1,

// KeySelector keySelector2,

// TypeInformation keyType)

KeyedTwoInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses

.forKeyedCoProcessFunction(function, new KeySelector() {

@Override

public String getKey(String value) throws Exception {

return value.split(",")[3];

}

}, new KeySelector() {

@Override

public String getKey(User value) throws Exception {

return value.getCity();

}

}, TypeInformation.of(String.class));

harness.processElement2(new User(2, "alan", 19, "bj"), 100);

harness.processElement1("1,alanchan,18,sh", 10);

Assert.assertEquals(harness.extractOutputValues(),

Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));

}

// 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement

@Test

public void testBroadcastOperator() throws Exception {

// 定义广播

// 数据格式:

// sh,上海

// bj,北京

// public class MapStateDescriptor

MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",

String.class,

String.class);

// public abstract class BroadcastProcessFunction

// * @param The input type of the non-broadcast side.

// * @param The input type of the broadcast side.

// * @param The output type of the operator.

BroadcastProcessFunction function = new BroadcastProcessFunction() {

// 负责处理广播流的元素

@Override

public void processBroadcastElement(String value, BroadcastProcessFunction.Context ctx,

Collector out) throws Exception {

System.out.println("收到广播数据:" + value);

// 得到广播流的存储状态

ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);

}

// 处理非广播流,关联维度

@Override

public void processElement(User value, BroadcastProcessFunction.ReadOnlyContext ctx,

Collector out) throws Exception {

// 得到广播流的存储状态

ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);

value.setCity(state.get(value.getCity()));

out.collect(value);

}

};

BroadcastOperatorTestHarness harness = ProcessFunctionTestHarnesses

.forBroadcastProcessFunction(function, broadcastDesc);

harness.processBroadcastElement("sh,上海", 10);

harness.processBroadcastElement("bj,北京", 20);

harness.processElement(new User(2, "alan", 19, "bj"), 10);

harness.processElement(new User(1, "alanchan", 18, "sh"), 30);

Assert.assertEquals(harness.extractOutputValues(),

Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));

}

}

三、测试 Flink 作业

1、JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

org.apache.flink

flink-test-utils

1.17.2

test

让我们采用与前面几节相同的简单 MapFunction来做示例。

/*

* @Author: alanchan

* @LastEditors: alanchan

* @Description:

*/

package com.win;

import static org.junit.Assert.assertFalse;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collections;

import java.util.List;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import org.apache.flink.test.util.MiniClusterResourceConfiguration;

import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.ClassRule;

import org.junit.Test;

public class TestExampleIntegrationDemo {

static class AlanIncrementMapFunction implements MapFunction {

@Override

public Long map(Long record) throws Exception {

return record + 1;

}

}

@ClassRule

public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(

new MiniClusterResourceConfiguration.Builder()

.setNumberSlotsPerTaskManager(2)

.setNumberTaskManagers(1)

.build());

@Test

public void testIncrementPipeline() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment

env.setParallelism(2);

// values are collected in a static variable

CollectSink.values.clear();

// create a stream of custom elements and apply transformations

env.fromElements(1L, 21L, 22L)

.map(new AlanIncrementMapFunction())

.addSink(new CollectSink());

// execute

env.execute();

// verify your results

assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));

}

// create a testing sink

private static class CollectSink implements SinkFunction {

// must be static

public static final List values = Collections.synchronizedList(new ArrayList<>());

@Override

public void invoke(Long value, SinkFunction.Context context) throws Exception {

values.add(value);

}

}

}

关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:

为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式