文章目录
一、Flink&Flink CDC官网二、CDC&Flink CDC介绍1、 什么是cdc2、什么是Flink CDC3、支持的连接器
三、springboot整合Filnk CDC1、官网示例2、Maven依赖1) Flink和Flink CDC版本映射2)具体maven依赖3)项目坑点
3、springboot代码示例1)创建变更监听器2)自定义数据解析器3)创建变更对象4)创建业务处理类5)运行代码监听mysql CDC事件
一、Flink&Flink CDC官网
Flink CDC地址 Flink官网地址
二、CDC&Flink CDC介绍
1、 什么是cdc
CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。
2、什么是Flink CDC
用于 Apache Flink 的 CDC 连接器是一组源连接器,用于®Apache Flink®,使用变更数据捕获 (CDC) 从不同的数据库引入变更。 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。查看更多关于什么是Debezium的信息。®
3、支持的连接器
三、springboot整合Filnk CDC
1、官网示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
2、Maven依赖
1) Flink和Flink CDC版本映射
2)具体maven依赖
3)项目坑点
mysql jdbc驱动包版本过低,项目启动时导致 java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;,找不到该方法,必须引入8.0.28及以上版本flink-connector-mysql-cdc 的2.3.0版本依赖中引入的是8.0.25,需要去除掉
项目启动报错:java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException,找不到此类,是因为缺少依赖包,引入相关依赖包
Flink的版本和Flink CDC的版本一定要兼容,按照官方给定的版本进行引入
版本过低会导致:Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation, 要解决此问题,你可以按照以下步骤操作: 确保使用的数据库用户具有 RELOAD 权限。请登录到 MySQL 数据库,并为用户授予 RELOAD 权限。例如,使用以下命令为用户 your_user 授予 RELOAD 权限:
GRANT RELOAD ON *.* TO 'your_user'@'localhost';
Flink官方解决上述问题 MySQL CDC source使用增量快照算法,避免了数据库锁的使用,因此不需要"RELOAD"权限。 从 Flink 1.12 版本开始,Flink 引入了对 MySQL CDC 的集成和支持。在这个版本中,Flink 提供了 flink-connector-mysql-cdc 模块,用于实现基于 MySQL 的 Change Data Capture 功能。 在 Flink 1.12 版本中,MySQL CDC 源使用了增量快照算法来捕获数据变更,并且不需要 RELOAD 权限。这种实现方式避免了数据库锁的使用,提供了低延迟的数据变更捕获能力。 代码中设置can.incremental.snapshot.enabled开启,详细代码见代码示例
Configuration config = new Configuration();
// 设置增量快照开启为 true
config.setBoolean("scan.incremental.snapshot.enabled", true);
env.configure(config);
3、springboot代码示例
1)创建变更监听器
创建MysqlEventListener 类实现ApplicationRunner ,项目启动时可以启动mysql监听
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
/**
* @Description: mysql变更监听器
* @Date: 2023/10/11
**/
public class MysqlEventListener implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
MySqlSource
.hostname("yourHostname")
.port(3306)
.databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
.tableList("yourDatabaseName.yourTableName") // 设置捕获的表,数据库.表名
.username("yourUsername")
.password("yourPassword")
.deserializer(new MysqlDeserialization()) // 将 SourceRecord 转换为 自定义对象
/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/
.startupOptions(StartupOptions.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
// 设置增量快照开启为 true
config.setBoolean("scan.incremental.snapshot.enabled", true);
env.configure(config);
env.setParallelism(1);
// DebeziumSourceFunction
DataStreamSource
.setParallelism(1);
streamSource.addSink(new DataChangeSink());
env.execute("mysql-stream-cdc");
};
}
2)自定义数据解析器
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Optional;
/**
* @Description: mysql自定序列化
* @Date: 2023/10/11
**/
public class MysqlDeserialization implements DebeziumDeserializationSchema
public static final String TS_MS = "ts_ms";
public static final String BIN_FILE = "file";
public static final String POS = "pos";
public static final String CREATE = "CREATE";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String UPDATE = "UPDATE";
/**
* 反序列化数据,转变为自定义对象DataChangeInfo
* @param sourceRecord
* @param collector
* @throws Exception
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
DataChangeInfo dataChangeInfo = new DataChangeInfo();
dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toUpperCase();
int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
dataChangeInfo.setEventType(eventType);
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
//7.输出数据
collector.collect(dataChangeInfo);
}
/**
* 从元数据获取变更前或者变更后的数据
* @param value
* @param fieldElement
* @return
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation
return TypeInformation.of(DataChangeInfo.class);
}
}
3)创建变更对象
import lombok.Data;
/**
* @Description: 数据变更对象
* @Date: 2023/10/11
**/
@Data
public class DataChangeInfo {
/**
* 变更前数据
*/
private String beforeData;
/**
* 变更后数据
*/
private String afterData;
/**
* 变更类型 1新增 2修改 3删除
*/
private Integer eventType;
/**
* binlog文件名
*/
private String fileName;
/**
* binlog当前读取点位
*/
private Integer filePos;
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 变更时间
*/
private Long changeTime;
}
4)创建业务处理类
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
* @Description: 数据处理
* @Date: 2023/10/11
**/
@Slf4j
public class DataChangeSink implements SinkFunction
@Override
public void invoke(String value, Context context) throws Exception {
log.info("收到变更原始数据:{}", value);
//业务代码
}
}
5)运行代码监听mysql CDC事件
项目启动成功
修改mysql数据库数据 变更前 变更后:点击保存 服务监听结果
推荐链接
发表评论