背景

上游 Kafka 数据为 debezium-json 格式,由 Flink SQL 关联 Kafka Stream 和 Dim 表打宽写入,由于上有任务重启回到至同一条数据多次进行下游 kafka 导致下游 Flink Stream API 消费导致数据重复处理; 目前的数据格式为 debezium-json 格式,主要的标识符为 C 和 D 标识的数据(包括新增的 C 的数据,删除场景的 D 的数据以及更新场景下被拆分为 D 和 C 标识的数据)

去重思路

参考 Flink SQL 的去重思路,将数据根据唯一标识进行 KEY-BY 操作,并将进入的数据进行存储,根据数据标识 C 或者 D 进行不同逻辑处理:

D 数据进入:需要判断是否之前有处理输出 C 标记数据,如果有则可以将 D 数据直接输出,并且如果 C 数据输出条数大于 1 ,则需要移除一条存储的 C 标记状态并输出该 C 标记数据(即 UPDATE场景触发);如果 C 标记数据为 0,标识没有输出则 D 标记数据也不可输出,存入状态等待 C 标记数据进入处理;如果 C 标记数据为 1,则只需要删除存储的 C 标记状态(即 删除场景触发)。C 数据进入:与 D 数据处理类似,C 数据可以抵消存储在状态中的 D 数据,如果数据第一次进入直接输出 C 并存储;如果 C 数据已有存储,则将当前进入的 C 数据进行存储不可输出(即 重复写入);如果 D 数据大于 0,则 C 和 D 同时输出并移除存储的一条 D 数据(即 更新场景)。

C 数据处理逻辑

D 数据处理逻辑

具体实现

public class DeduplicationFunction

extends KeyedProcessFunction {

// 存储当前ID数据是否已有输出

ValueState isUniqueState;

// D 标识数据存储

ListState deleteDataState;

// C 标识数据存储

ListState createDataState;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

// 设置状态过期时间

final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))

.neverReturnExpired()

.updateTtlOnCreateAndWrite()

.useProcessingTime()

.build();

final ValueStateDescriptor valueStateDescriptor =

new ValueStateDescriptor<>("isUnique-state", Boolean.class);

valueStateDescriptor.enableTimeToLive(ttlConfig);

isUniqueState = getRuntimeContext().getState(

valueStateDescriptor

);

final ListStateDescriptor deleteDataStateDesc =

new ListStateDescriptor<>("deleteDataState-state", String.class);

deleteDataStateDesc.enableTimeToLive(ttlConfig);

deleteDataState = getRuntimeContext().getListState(

deleteDataStateDesc

);

final ListStateDescriptor createDataStateDesc =

new ListStateDescriptor<>("createDataState-state", String.class);

createDataStateDesc.enableTimeToLive(ttlConfig);

createDataState = getRuntimeContext().getListState(

createDataStateDesc

);

}

@Override

public void processElement(String value, Context ctx, Collector out) throws Exception {

final JSONObject jsonObject = JSONObject.parseObject(value);

final String op = jsonObject.getString("op");

Boolean isUnique = isUniqueState.value();

// isUnique 为 true 说明输出了 C 标识的数据

if (null == isUnique || !isUnique) {

// 如果 isUnique 状态为false,

// 说明之前未处理过该数据,将该ID标记为处理过,

// 并输出该数据

if (op.equalsIgnoreCase("c")) {

// 如果当前数据是C类型

createDataState.add(value);

out.collect(value);

} else if (op.equalsIgnoreCase("d")) {

// 如果当前数据是D类型, 为不可输出数据,存入状态

deleteDataState.add(value);

}

isUniqueState.update(true);

} else {

final List createDataList =

IteratorUtils.toList(createDataState.get().iterator());

final List delDataList =

IteratorUtils.toList(deleteDataState.get().iterator());

// 如果 isUnique 状态为 true ,说明之前已经处理过该数据

if (op.equalsIgnoreCase("d")) {

// 如果当前数据是D类型

// 查找之前是否存在一个 C 操作的版本,

// 如果存在则输出之前的 C 操作数据,

// 否则将当前的 D 操作数据存储在 Keyed List State 中

if (createDataList.size() > 0) {

out.collect(value);

// 将当前 D 操作数据抵消掉之前积累的 C

// 操作数据列表中的一个 C 操作数据

if (createDataList.size() > 1)

// 如果 C 标记数据大于 1 则从存储中移除一个并和 D 数据一起输出

// 可理解为 UPDATE 操作

final String remove = createDataList.remove(0);

// c 和 d 都输出

out.collect(remove);

} else {

// 移除之前存储的 C 状态数据

createDataList.remove(0);

}

createDataState.update(createDataList);

} else {

delDataList.add(value);

deleteDataState.update(delDataList);

}

} else {

// C 标记数据

if (delDataList.size() > 0) {

// 将当前 D 操作数据抵消掉之前积累的 C

// 操作数据 列表中的一个 C 操作数据

final String remove = delDataList.remove(0);

deleteDataState.update(delDataList);

// c 和 d 都输出

out.collect(remove);

out.collect(value);

} else if (createDataList.size() > 0) {

createDataList.add(value);

createDataState.update(createDataList);

} else {

// 当前 状态中 C 数据 和 D 数据都为0 即 相当于 isUnique = false 状态

// 所以需要将 C 数据存储

createDataState.add(value);

out.collect(value);

}

}

}

}

}

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: