文章目录

MySQL CDC配置第一步: 启用binlog1. 检查MySQL的binlog是否已启用2. 若未启用binlog

第二步: 设置binlog格式为row1. 确保MySQL的binlog格式设置为ROW2. 若未设置为row

第三步: 创建CDC用户

MySQL CDC DataStream API实现1. 定义MySqlSource2. 数据处理3. sink到MySQL

参考

MySQL CDC配置

第一步: 启用binlog

1. 检查MySQL的binlog是否已启用

show variables like '%log_bin%';

2. 若未启用binlog

打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)找到[mysqld]部分,添加如下配置log-bin=mysql-bin # 指定二进制日志文件的名称前缀

server-id=1 # 唯一标识MySQL服务器的数字

expire_logs_days=30 # binlog日志过期时间(按实际情况配置)

保存并关闭配置文件, 并重启MySQL服务使配置生效sudo systemctl restart mysqld

第二步: 设置binlog格式为row

因为要监控表记录变更前后的具体数据, 需要将binlog格式设置为row.

1. 确保MySQL的binlog格式设置为ROW

show variables like '%binlog_format%';

2. 若未设置为row

打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)找到[mysqld]部分,添加如下配置binlog_format=ROW

保存并关闭配置文件, 并重启MySQL服务使配置生效sudo systemctl restart mysqld

第三步: 创建CDC用户

创建一个具备合适权限的MySQL用户, 使得Debezium MySQL connector可以监控数据库的变化.

创建MySQL用户, 用于Flink CDC连接到MySQL数据库 CREATE USER 'flinkcdc'@'%' IDENTIFIED BY 'FlinkCDC_123456';

授予该用户适当的权限以访问要采集的数据库和表。 GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY 'FlinkCDC_123456';

使权限生效 FLUSH PRIVILEGES;

MySQL CDC DataStream API实现

所使用软件的版本

java 1.8Scala 2.11Flink 1.14.2Flink CDC 2.3.0Source MySQL 5.7Sink MySQL 5.7jackson 2.10.2

MySQL CDC DataStream API可实现一个job监控采集多个数据库、多个表.

1. 定义MySqlSource

//源数据库连接配置文件

Properties dbProps = DbConfigUtil.loadConfig("mysql.properties");

//Debezium配置

Properties debeziumProps = new Properties();

//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string

//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)

//以double值来表示它们,这可能会到值精度丢失

//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)

debeziumProps.setProperty("decimal.handling.mode", "string");

//Time、date和timestamps可以以不同的精度表示,包括:

//adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。

//adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。

//connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。

debeziumProps.setProperty("time.precision.mode", "connect");

//MySQL CDC数据源

MySqlSource sourceFunction = MySqlSource.builder()

.hostname(dbProps.getProperty("host"))

.port(Integer.parseInt(dbProps.getProperty("port")))

.databaseList(dbProps.getProperty("database_list").split(","))

.tableList(dbProps.getProperty("table_list").split(","))

.username(dbProps.getProperty("username"))

.password(dbProps.getProperty("password"))

.connectionPoolSize(2)

.serverTimeZone("Asia/Shanghai")

.debeziumProperties(debeziumProps)

.deserializer(new JsonDebeziumDeserializationSchema())

.serverId("6001")

.startupOptions(StartupOptions.initial())

.build();

2. 数据处理

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 启用Checkpoint

env.enableCheckpointing(60000);

// 默认即为EXACTLY_ONCE。设置Checkpoint模式为EXACTLY_ONCE,每条记录在恢复的时候都是精确一次地处理的

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置状态后端

env.setStateBackend(new HashMapStateBackend());

// 设置Checkpoint状态存储系统及目录

env.getCheckpointConfig().setCheckpointStorage("hdfs://ns/flink/checkpoint/mysql_cdc");

// 两次Checkpoint之间的最小暂停时间是500 ms

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints必须在指定的时间内完成,否则被丢弃

env.getCheckpointConfig().setCheckpointTimeout(60000);

//只允许checkpoint连续失败两次

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

// 设置最大并行运行的Checkpoint数量

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 在作业取消时保留外部检查点

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 启用非对齐Checkpoint,可以极大减少背压情况的下Checkpoint次数

env.getCheckpointConfig().enableUnalignedCheckpoints();

//获取数据源

SingleOutputStreamOperator dataStreamSource = env

.addSource(sourceFunction)

.uid("source-01").name("read-from-source");

ObjectMapper mapper = new ObjectMapper();

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);

//JSON字符串转JsonNode

SingleOutputStreamOperator dataStreamJsonNode = dataStreamSource

.map(line -> mapper.readTree(line))

.uid("map-01").name("source-to-JsonNode");

// 从监控的多个表中过滤出'订单表', 并解析Json的after数据

SingleOutputStreamOperator orderOperator = dataStreamJsonNode

.filter(line -> "order_info".equalsIgnoreCase(line.get("source").get("table").asText()))

.uid("order-info-filter-01").name("filter-order-info")

.map(line -> line.get("after").toString())

.uid("order-info-map-01").name("parse-order-info-after")

.map(line -> mapper.readValue(line, OrderInfo.class))

.uid("order-info-map-02").name("order-info-to-pojo");

3. sink到MySQL

// 定义JdbcSink

SinkFunction orderInfoSink = JdbcSink.sink(

UPSERT_SQL,

(JdbcStatementBuilder) (ps, order) -> new OrderInfoPreparedStatementSetter().setParams(ps, order),

JdbcExecutionOptions.builder()

.withBatchSize(100)

.withBatchIntervalMs(2000)

.withMaxRetries(3)

.build(),

JdbcSinkConnUtil.getConnOptions("sink-mysql.properties")

);

orderOperator.addSink(orderInfoSink);

参考

https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-property-decimal-handling-modehttps://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc.htmlhttps://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: