pom

1.13.0

org.apache.flink

flink-java

${flink-version}

org.apache.flink

flink-streaming-java_2.12

${flink-version}

org.apache.flink

flink-clients_2.12

${flink-version}

org.apache.hadoop

hadoop-client

3.1.3

mysql

mysql-connector-java

8.0.16

org.apache.flink

flink-table-planner-blink_2.12

${flink-version}

com.ververica

flink-connector-mysql-cdc

2.2.1

com.alibaba

fastjson

1.2.75

代码

注意开启checkpoint 和不开启是有区别的(savepoint也可以 启动的flink指定时候 -s savepath)

不开启,如果项目重启了,会重新读取所有的数据

开启了,项目重启了额,会根据保留的信息去读取变化的数据

import com.ververica.cdc.connectors.mysql.MySqlSource;

import com.ververica.cdc.connectors.mysql.table.StartupOptions;

import com.ververica.cdc.debezium.DebeziumSourceFunction;

import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCTest {

public static void main(String[] args) throws Exception {

//1.获取Flink 执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

//1.1 开启CK

// env.enableCheckpointing(5000);

// env.getCheckpointConfig().setCheckpointTimeout(10000);

// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//

// env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

//2.通过FlinkCDC构建SourceFunction

DebeziumSourceFunction sourceFunction = MySqlSource.builder()

.hostname("9.134.70.1")

.port(3306)

.username("root")

.password("xxxxxxx")

.databaseList("cc")

.tableList("cc.student")

// .deserializer(new StringDebeziumDeserializationSchema())

.deserializer(new JsonDebeziumDeserializationSchema())

.startupOptions(StartupOptions.initial())

.build();

DataStreamSource dataStreamSource = env.addSource(sourceFunction);

//3.数据打印

dataStreamSource.print("==FlinkCDC==");

//4.启动任务

env.execute("FlinkCDC");

}

}

 mysql

 

数据库表

 增加一条数据

打印日志 op:c 是create

==FlinkCDC==> {"before":null,"after":{"id":1,"name":"name1","age":"1"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689245998000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2781,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1689246005928,"transaction":null}

修改一条数据 age=1 ->age=2  op=u 是update

flink打印日志

==FlinkCDC==> {"before":{"id":1,"name":"name1","age":"1"},"after":{"id":1,"name":"name1","age":"2"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689246092000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3049,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1689246099578,"transaction":null}

删除这条数据 op=d 是delete

==FlinkCDC==> {"before":{"id":1,"name":"name1","age":"2"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689246163000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3333,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1689246170857,"transaction":null}

由于打印的日志太多 我们可以用fastjson稍微封装下 然后传给sink去处理,根据update delete insert实时更新下游数据,还有一个op=r 是读取的数据

还可以使用flinksql

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class FlinkSqlCdc {

public static void main(String[] args) throws Exception {

//1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//2.使用FLINKSQL DDL模式构建CDC 表

tableEnv.executeSql("CREATE TABLE user_info ( " +

" id STRING primary key, " +

" name int, " +

" age STRING " +

") WITH ( " +

" 'connector' = 'mysql-cdc', " +

" 'scan.startup.mode' = 'latest-offset', " +

" 'hostname' = '9.134.70.1', " +

" 'port' = '3306', " +

" 'username' = 'root', " +

" 'password' = 'xxxxx', " +

" 'database-name' = 'cc', " +

" 'table-name' = 'student' " +

")");

//3.查询数据并转换为流输出

Table table = tableEnv.sqlQuery("select * from user_info");

DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);

retractStream.print();

//4.启动

env.execute("FlinkSQLCDC");

}

}

 同样是增删改查,flink打印日志如下

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: