文章目录

一、Flink&Flink CDC官网二、CDC&Flink CDC介绍1、 什么是cdc2、什么是Flink CDC3、支持的连接器

三、springboot整合Filnk CDC1、官网示例2、Maven依赖1) Flink和Flink CDC版本映射2)具体maven依赖3)项目坑点

3、springboot代码示例1)创建变更监听器2)自定义数据解析器3)创建变更对象4)创建业务处理类5)运行代码监听mysql CDC事件

一、Flink&Flink CDC官网

Flink CDC地址 Flink官网地址

二、CDC&Flink CDC介绍

1、 什么是cdc

CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。

2、什么是Flink CDC

用于 Apache Flink 的 CDC 连接器是一组源连接器,用于®Apache Flink®,使用变更数据捕获 (CDC) 从不同的数据库引入变更。 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。查看更多关于什么是Debezium的信息。®

3、支持的连接器

三、springboot整合Filnk CDC

1、官网示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {

public static void main(String[] args) throws Exception {

MySqlSource mySqlSource = MySqlSource.builder()

.hostname("yourHostname")

.port(yourPort)

.databaseList("yourDatabaseName") // set captured database

.tableList("yourDatabaseName.yourTableName") // set captured table

.username("yourUsername")

.password("yourPassword")

.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String

.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint

env.enableCheckpointing(3000);

env

.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")

// set 4 parallel source tasks

.setParallelism(4)

.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute("Print MySQL Snapshot + Binlog");

}

}

2、Maven依赖

1) Flink和Flink CDC版本映射

2)具体maven依赖

1.8

UTF-8

UTF-8

2.3.12.RELEASE

2.17.0

3.4.1

4.1.0

1.16.0

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-logging

org.apache.logging.log4j

log4j-core

${log4j2.version}

org.apache.logging.log4j

log4j-api

${log4j2.version}

org.apache.logging.log4j

log4j-slf4j-impl

${log4j2.version}

org.apache.logging.log4j

log4j-1.2-api

${log4j2.version}

org.springframework.boot

spring-boot-starter-log4j2

com.github.pagehelper

pagehelper-spring-boot-starter

1.4.1

mysql

mysql-connector-java

8.0.29

com.alibaba

druid-spring-boot-starter

1.2.4

com.alibaba

easyexcel

3.1.1

org.apache.shardingsphere

sharding-jdbc-spring-boot-starter

4.1.1

com.baomidou

mybatis-plus-boot-starter

${mybatis-plus.version}

com.baomidou

mybatis-plus-generator

${mybatis-plus.version}

org.apache.velocity

velocity-engine-core

2.3

com.github.pagehelper

pagehelper-spring-boot-starter

1.4.1

com.baomidou

dynamic-datasource-spring-boot-starter

3.6.1

org.springframework.boot

spring-boot-starter-validation

org.apache.httpcomponents

httpclient

4.5.14

com.alibaba

fastjson

1.2.76

com.aliyun.oss

aliyun-sdk-oss

3.15.0

com.xuxueli

xxl-job-core

2.4.0

org.apache.flink

flink-streaming-java

${flink.version}

org.apache.flink

flink-clients

${flink.version}

com.ververica

flink-connector-mysql-cdc

2.3.0

mysql

mysql-connector-java

org.apache.flink

flink-table-api-java

1.12.1

3)项目坑点

mysql jdbc驱动包版本过低,项目启动时导致 java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;,找不到该方法,必须引入8.0.28及以上版本flink-connector-mysql-cdc 的2.3.0版本依赖中引入的是8.0.25,需要去除掉

mysql

mysql-connector-java

8.0.29

com.ververica

flink-connector-mysql-cdc

2.3.0

mysql

mysql-connector-java

项目启动报错:java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException,找不到此类,是因为缺少依赖包,引入相关依赖包

org.apache.flink

flink-table-api-java

1.12.1

Flink的版本和Flink CDC的版本一定要兼容,按照官方给定的版本进行引入

版本过低会导致:Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation, 要解决此问题,你可以按照以下步骤操作: 确保使用的数据库用户具有 RELOAD 权限。请登录到 MySQL 数据库,并为用户授予 RELOAD 权限。例如,使用以下命令为用户 your_user 授予 RELOAD 权限:

GRANT RELOAD ON *.* TO 'your_user'@'localhost';

Flink官方解决上述问题 MySQL CDC source使用增量快照算法,避免了数据库锁的使用,因此不需要"RELOAD"权限。 从 Flink 1.12 版本开始,Flink 引入了对 MySQL CDC 的集成和支持。在这个版本中,Flink 提供了 flink-connector-mysql-cdc 模块,用于实现基于 MySQL 的 Change Data Capture 功能。 在 Flink 1.12 版本中,MySQL CDC 源使用了增量快照算法来捕获数据变更,并且不需要 RELOAD 权限。这种实现方式避免了数据库锁的使用,提供了低延迟的数据变更捕获能力。 代码中设置can.incremental.snapshot.enabled开启,详细代码见代码示例

Configuration config = new Configuration();

// 设置增量快照开启为 true

config.setBoolean("scan.incremental.snapshot.enabled", true);

env.configure(config);

3、springboot代码示例

1)创建变更监听器

创建MysqlEventListener 类实现ApplicationRunner ,项目启动时可以启动mysql监听

import com.ververica.cdc.connectors.mysql.source.MySqlSource;

import com.ververica.cdc.connectors.mysql.table.StartupOptions;

import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

/**

* @Description: mysql变更监听器

* @Date: 2023/10/11

**/

public class MysqlEventListener implements ApplicationRunner {

@Override

public void run(ApplicationArguments args) throws Exception {

MySqlSource mySqlSource = MySqlSource.builder()

.hostname("yourHostname")

.port(3306)

.databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".

.tableList("yourDatabaseName.yourTableName") // 设置捕获的表,数据库.表名

.username("yourUsername")

.password("yourPassword")

.deserializer(new MysqlDeserialization()) // 将 SourceRecord 转换为 自定义对象

/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)

* latest:只进行增量导入(不读取历史变化)

* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)

*/

.startupOptions(StartupOptions.latest())

.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();

// 设置增量快照开启为 true

config.setBoolean("scan.incremental.snapshot.enabled", true);

env.configure(config);

env.setParallelism(1);

// DebeziumSourceFunction dataChangeInfoMySqlSource = buildDataChangeSource();

DataStreamSource streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")

.setParallelism(1);

streamSource.addSink(new DataChangeSink());

env.execute("mysql-stream-cdc");

};

}

2)自定义数据解析器

import com.alibaba.fastjson.JSONObject;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

import io.debezium.data.Envelope;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.util.Collector;

import org.apache.kafka.connect.data.Field;

import org.apache.kafka.connect.data.Schema;

import org.apache.kafka.connect.data.Struct;

import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

import java.util.Optional;

/**

* @Description: mysql自定序列化

* @Date: 2023/10/11

**/

public class MysqlDeserialization implements DebeziumDeserializationSchema {

public static final String TS_MS = "ts_ms";

public static final String BIN_FILE = "file";

public static final String POS = "pos";

public static final String CREATE = "CREATE";

public static final String BEFORE = "before";

public static final String AFTER = "after";

public static final String SOURCE = "source";

public static final String UPDATE = "UPDATE";

/**

* 反序列化数据,转变为自定义对象DataChangeInfo

* @param sourceRecord

* @param collector

* @throws Exception

*/

@Override

public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

String topic = sourceRecord.topic();

String[] fields = topic.split("\\.");

String database = fields[1];

String tableName = fields[2];

Struct struct = (Struct) sourceRecord.value();

final Struct source = struct.getStruct(SOURCE);

DataChangeInfo dataChangeInfo = new DataChangeInfo();

dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());

dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());

//5.获取操作类型 CREATE UPDATE DELETE

Envelope.Operation operation = Envelope.operationFor(sourceRecord);

String type = operation.toString().toUpperCase();

int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;

dataChangeInfo.setEventType(eventType);

dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));

dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));

dataChangeInfo.setDatabase(database);

dataChangeInfo.setTableName(tableName);

dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));

//7.输出数据

collector.collect(dataChangeInfo);

}

/**

* 从元数据获取变更前或者变更后的数据

* @param value

* @param fieldElement

* @return

*/

private JSONObject getJsonObject(Struct value, String fieldElement) {

Struct element = value.getStruct(fieldElement);

JSONObject jsonObject = new JSONObject();

if (element != null) {

Schema afterSchema = element.schema();

List fieldList = afterSchema.fields();

for (Field field : fieldList) {

Object afterValue = element.get(field);

jsonObject.put(field.name(), afterValue);

}

}

return jsonObject;

}

@Override

public TypeInformation getProducedType() {

return TypeInformation.of(DataChangeInfo.class);

}

}

3)创建变更对象

import lombok.Data;

/**

* @Description: 数据变更对象

* @Date: 2023/10/11

**/

@Data

public class DataChangeInfo {

/**

* 变更前数据

*/

private String beforeData;

/**

* 变更后数据

*/

private String afterData;

/**

* 变更类型 1新增 2修改 3删除

*/

private Integer eventType;

/**

* binlog文件名

*/

private String fileName;

/**

* binlog当前读取点位

*/

private Integer filePos;

/**

* 数据库名

*/

private String database;

/**

* 表名

*/

private String tableName;

/**

* 变更时间

*/

private Long changeTime;

}

4)创建业务处理类

import lombok.extern.slf4j.Slf4j;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**

* @Description: 数据处理

* @Date: 2023/10/11

**/

@Slf4j

public class DataChangeSink implements SinkFunction {

@Override

public void invoke(String value, Context context) throws Exception {

log.info("收到变更原始数据:{}", value);

//业务代码

}

}

5)运行代码监听mysql CDC事件

项目启动成功

修改mysql数据库数据 变更前 变更后:点击保存 服务监听结果

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: