Flink CDC 3.0 表结构变更的处理流程
表结构变更主要涉及到三个类SchemaOperator、DataSinkWriterOperator(Sink端)和SchemaRegistry(协调器);SchemaOperator接收结构变更消息时会通知sink端和协调器,并等待结构变更操作在协调器执行完毕后在处理后续数据,具体流程参考如下。
前提条件
cdc版本:Flink-cdc 3.0
Flink版本:Flink 1.18
SchemaOperator类
Source抓表结构变更事件推送到SchemaOperator时,SchemaOperator会向协调器(也就是SchemaRegistry)发起变更请求;如果是表结构变更,则向Sink发送flushEvent,让其(Sink)flush内存中数据(Sink是经过DataSinkWriterOperator包装),最后阻塞数据流;
SchemaOperator处理表结构变更事件
# SchemaOperator
@Override
public void processElement(StreamRecord
Event event = streamRecord.getValue();
//如果是schame change事件
if (event instanceof SchemaChangeEvent) {
TableId tableId = ((SchemaChangeEvent) event).tableId();
LOG.info(
"Table {} received SchemaChangeEvent and start to be blocked.",
tableId.toString());
//处理schame change事件
handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
return;
}
output.collect(streamRecord);
}
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing finished
//向协调节点(SchemaRegistry)发送表结构变更请求,是表结构变更会返回true 如果是建表则返回false
SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
//向sink发送 flush事件和schame信息
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
output.collect(new StreamRecord<>(schemaChangeEvent));
// The request will block until flushing finished in each sink writer
// 这个请求查询协调器,当前schame是否执行完毕,如果没有则阻塞等待,直到协调器完成schame change操作
requestReleaseUpstream();
}
}
Sink端
Sink端flush掉变更前的数据,并上报给协调器(SchemaRegistry)缓存刷新完成
Sink端处理表结构变更事件,并上报给协调器
# DataSinkWriterOperator 创建sink时会使用DataSinkWriterOperator包装,用于处理FlushEvent和CreateTableEvent事件
@Override
public void processElement(StreamRecord
Event event = element.getValue();
// 处理FlushEvent事件
if (event instanceof FlushEvent) {
handleFlushEvent(((FlushEvent) event));
return;
}
// CreateTableEvent marks the table as processed directly
if (event instanceof CreateTableEvent) {
processedTableIds.add(((CreateTableEvent) event).tableId());
this.
.processElement(element);
return;
}
// Check if the table is processed before emitting all other events, because we have to make
// sure that sink have a view of the full schema before processing any change events,
// including schema changes.
ChangeEvent changeEvent = (ChangeEvent) event;
if (!processedTableIds.contains(changeEvent.tableId())) {
emitLatestSchema(changeEvent.tableId());
processedTableIds.add(changeEvent.tableId());
}
processedTableIds.add(changeEvent.tableId());
this.
.processElement(element);
}
# handleFlushEvent 向协调节点(SchemaRegistry)发送`FlushSuccess`请求
private void handleFlushEvent(FlushEvent event) throws Exception {
copySinkWriter.flush(false);
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
}
协调器
协调节点收到所有Sink的flush完成通知后,然后执行结构变更操作,最后通知完成给等待的requestReleaseUpstream请求。
协调节点处理FlushSuccess请求
public void flushSuccess(TableId tableId, int sinkSubtask) {
flushedSinkWriters.add(sinkSubtask);
//所有节点都处理完成
if (flushedSinkWriters.equals(activeSinkWriters)) {
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
//执行表结构变更操作
applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
//通知等待的SchemaOperator,结构变更完成!
waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));
if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
startNextSchemaChangeRequest();
}
}
}
更多请参考github:参考地址
参考阅读
发表评论