背景

在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确

测试状态ttl超时的单元测试

首先看一下处理函数:

// 处理函数

public class MyStateProcessFunction extends KeyedProcessFunction {

// 键值分区状态

ValueState previousInput;

@Override

public void open(Configuration parameters) throws Exception {

ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("previousInput", Types.STRING);

// 状态ttl超时时间设置

StateTtlConfig ttlConfig =

StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

// check 10 keys for every state access

.cleanupIncrementally(10, false).build();

stateDescriptor.enableTimeToLive(ttlConfig);

previousInput = getRuntimeContext().getState(stateDescriptor);

}

@Override

public void processElement(String in, Context context, Collector collector) throws Exception {

context.timerService().registerProcessingTimeTimer(100);

String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;

collector.collect(out);

if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉

previousInput.update(in);

}

}

@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

if (Objects.nonNull(previousInput.value())) {

out.collect(String.format("timer trigger %s", previousInput.value()));

} else {

out.collect(String.format("timer trigger state clear", previousInput.value()));

}

}

}

单元测试代码:

/**

* 测试状态处理函数,包含状态的ttl配置,以及ontimer方法

**/

@Test

public void testKeyedStateProcessFunction() throws Exception {

MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();

OneInputStreamOperatorTestHarness testHarness =

ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);

testHarness.open();

testHarness.processElement("hello", 10);

// 注册了一个定时器,定时器100后过期

Assert.assertEquals(1, testHarness.numProcessingTimeTimers());

// 测试输出

Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());

ValueState previousInput = myStateProcessFunction.getRuntimeContext()

.getState(new ValueStateDescriptor<>("previousInput", Types.STRING));

// 查看下状态应该已经被设置

Assert.assertEquals("hello", previousInput.value());

testHarness.processElement("world", 10);

// 再次测试输出

Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());

// 再次查看下状态应该已经被设置

Assert.assertEquals("world", previousInput.value());

// 设置时间为1分钟,让状态超时

testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());

// 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态

testHarness.processElement("NotUpdate1", System.currentTimeMillis());

// 查看下状态应该已经被清理

Assert.assertNull(previousInput.value());

// 设置让定时器过期,顺带确认下状态已经被清理

testHarness.setProcessingTime(100);

// 测试输出(包含两个输入+一个定时器的输出)

Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),

testHarness.extractOutputValues());

testHarness.close();

}

测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: