Flink CDC 3.0 表结构变更的处理流程

​ 表结构变更主要涉及到三个类SchemaOperator、DataSinkWriterOperator(Sink端)和SchemaRegistry(协调器);SchemaOperator接收结构变更消息时会通知sink端和协调器,并等待结构变更操作在协调器执行完毕后在处理后续数据,具体流程参考如下。

前提条件

cdc版本:Flink-cdc 3.0

Flink版本:Flink 1.18

SchemaOperator类

​ Source抓表结构变更事件推送到SchemaOperator时,SchemaOperator会向协调器(也就是SchemaRegistry)发起变更请求;如果是表结构变更,则向Sink发送flushEvent,让其(Sink)flush内存中数据(Sink是经过DataSinkWriterOperator包装),最后阻塞数据流;

SchemaOperator处理表结构变更事件

# SchemaOperator

@Override

public void processElement(StreamRecord streamRecord) {

Event event = streamRecord.getValue();

//如果是schame change事件

if (event instanceof SchemaChangeEvent) {

TableId tableId = ((SchemaChangeEvent) event).tableId();

LOG.info(

"Table {} received SchemaChangeEvent and start to be blocked.",

tableId.toString());

//处理schame change事件

handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);

return;

}

output.collect(streamRecord);

}

private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {

// The request will need to send a FlushEvent or block until flushing finished

//向协调节点(SchemaRegistry)发送表结构变更请求,是表结构变更会返回true 如果是建表则返回false

SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);

if (response.isShouldSendFlushEvent()) {

LOG.info(

"Sending the FlushEvent for table {} in subtask {}.",

tableId,

getRuntimeContext().getIndexOfThisSubtask());

//向sink发送 flush事件和schame信息

output.collect(new StreamRecord<>(new FlushEvent(tableId)));

output.collect(new StreamRecord<>(schemaChangeEvent));

// The request will block until flushing finished in each sink writer

// 这个请求查询协调器,当前schame是否执行完毕,如果没有则阻塞等待,直到协调器完成schame change操作

requestReleaseUpstream();

}

}

Sink端

​ Sink端flush掉变更前的数据,并上报给协调器(SchemaRegistry)缓存刷新完成

Sink端处理表结构变更事件,并上报给协调器

# DataSinkWriterOperator 创建sink时会使用DataSinkWriterOperator包装,用于处理FlushEvent和CreateTableEvent事件

@Override

public void processElement(StreamRecord element) throws Exception {

Event event = element.getValue();

// 处理FlushEvent事件

if (event instanceof FlushEvent) {

handleFlushEvent(((FlushEvent) event));

return;

}

// CreateTableEvent marks the table as processed directly

if (event instanceof CreateTableEvent) {

processedTableIds.add(((CreateTableEvent) event).tableId());

this.>>getFlinkWriterOperator()

.processElement(element);

return;

}

// Check if the table is processed before emitting all other events, because we have to make

// sure that sink have a view of the full schema before processing any change events,

// including schema changes.

ChangeEvent changeEvent = (ChangeEvent) event;

if (!processedTableIds.contains(changeEvent.tableId())) {

emitLatestSchema(changeEvent.tableId());

processedTableIds.add(changeEvent.tableId());

}

processedTableIds.add(changeEvent.tableId());

this.>>getFlinkWriterOperator()

.processElement(element);

}

# handleFlushEvent 向协调节点(SchemaRegistry)发送`FlushSuccess`请求

private void handleFlushEvent(FlushEvent event) throws Exception {

copySinkWriter.flush(false);

schemaEvolutionClient.notifyFlushSuccess(

getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());

}

协调器

​ 协调节点收到所有Sink的flush完成通知后,然后执行结构变更操作,最后通知完成给等待的requestReleaseUpstream请求。

协调节点处理FlushSuccess请求

public void flushSuccess(TableId tableId, int sinkSubtask) {

flushedSinkWriters.add(sinkSubtask);

//所有节点都处理完成

if (flushedSinkWriters.equals(activeSinkWriters)) {

LOG.info(

"All sink subtask have flushed for table {}. Start to apply schema change.",

tableId.toString());

PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);

//执行表结构变更操作

applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());

//通知等待的SchemaOperator,结构变更完成!

waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));

if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {

startNextSchemaChangeRequest();

}

}

}

更多请参考github:参考地址

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: