​ 介绍两种数据库用 flink1.17 做数据实时同步的操作。

第一种:mysql 同步到 sqlserver 第二种:sqlserver 同步到 sqlserver

步骤一,环境的准备

准备一台有 java 环境的centos系统的主机或虚拟机

下载 flink:

https://flink.apache.org/downloads/ ​

下载 mysql、sqlserver 相关 jar 包

# flink cdc 读取源数据的jar包

flink-sql-connector-mysql-cdc-2.4.0.jar

flink-sql-connector-sqlserver-cdc-2.4.0.jar

# flink jdbc 写数据的jar包,与 mssql 的驱动包

flink-connector-jdbc-3.1.1-1.17.jar

mssql-jdbc-9.4.1.jre8.jar

以上jar下载地址:

# flink cdc

https://repo1.maven.org/maven2/com/ververica/

# flink connector

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/

# mssql jdbc 驱动

https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/

将这些jar包放在 flink 的 lib 目录下

启动 flink

# 转到 flink 的 bin目录下,执行 start-cluster.sh 启动 flink

./start-cluster.sh

flink启动成功!

步骤二,数据库开启CDC(只需要对源数据库开启cdc)

1,mysql 的 cdc,需要开启 row 模式 以自建的 testcdc 数据库为例 创建test表,必须要有主键,如下图

开启数据库row模式,配置文件中修改,并重启数据库,如下图。 建议你安装 宝塔 面板对mysql数据库进行操作。

mysql配置完成!

2,sqlserver 的 cdc 需要代理支持

以 自建的 flinkcdc 数据库为例

-- 对当前数据库启用 CDC

USE [flinkcdc]

EXECUTE sys.sp_cdc_enable_db;

-- 查看数据库是否启用cdc

SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

创建需要cdc的表testcdc 表 testcdc 启用cdc

-- 启用cdc

EXEC sys.sp_cdc_enable_table

@source_schema = 'dbo',

@source_name = 'testcdc',

@role_name = 'testRole', -- 角色要赋予权限

@supports_net_changes = 0

-- 查看当前数据库表是否启用cdc

SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;

EXEC sys.sp_cdc_help_change_data_capture

特别注意的时角色需要赋予权限,这里为了方便直接使用的是db_owner,并且要开启代理。 代理会自动部署相关作业,如下图。

sqlserver数据库的cdc完成!

步骤三,建立 flink 与 数据库 的连接

# 启动 flink 的 sql-client.sh

./sql-client.sh

启动成功!如下图 建议cdc源映射关系

CREATE TABLE source (

id INT,

name varchar(100),

create_time date,

PRIMARY KEY(id) NOT ENFORCED

) WITH (

'connector' = 'mysql-cdc',

'hostname' = '192.168.6.130',

'port' = '3306',

'username' = 'root',

'password' = 'root',

'database-name' = 'testcdc',

'table-name' = 'test'

);

查看数据

select * from source;

建立目标映射,jdbc写数据

CREATE TABLE test (

id INT,

name varchar(100),

create_time date,

PRIMARY KEY(id) NOT ENFORCED

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:sqlserver://192.168.6.1:1433;DatabaseName=flinkcdc',

'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',

'username' = 'sa',

'password' = '你的数据库密码',

'table-name' = 'dbo.test'

);

步骤四,同步数据

insert into test(id,name,create_time)

select id,name,create_time from source;

数据就从mysql源表 写入到 sqlserver 目标表中了(这里的目标表是在sqlserver中创建了一张名为test的表)。 同理,sqlserver 同步 到sqlserver 跟上面mysql 同步到 sqlserver是一样的。 只是 源 映射需要使用,如下:

CREATE TABLE source(

id INT,

name varchar(100),

create_time date,

PRIMARY KEY(id) NOT ENFORCED

) WITH (

'connector' = 'sqlserver-cdc',

'hostname' = '192.168.6.1',

'port' = '1433',

'username' = 'sa',

'password' = '你的密码',

'database-name' = 'flinkcdc',

'table-name' = 'dbo.testcdc'

);

你也可以在flink的web页面查看同步job

Job ID: 881f141de6b8631395571a9d879e3e32

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: