Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引

文章目录

Flink 系列文章一、maxwell Format1、maxwell介绍2、binlog设置及验证1)、配置2)、重启mysql3)、验证

3、部署1)、下载2)、解压3)、创建元数据库4)、启动方式

4、示例1:maxwell CDC 输出至控制台1)、启动maxwell2)、操作mysql监控的数据库,观察其控制台输出

5、示例2:maxwell CDC 输出至kafka1)、启动maxwell2)、通过命令行打开kafka消费者3)、操作mysql监控的数据库,观察其控制台输出

二、Flink 与 maxwell 实践1、maven依赖2、Flink sql client 建表示例3、Available Metadata4、Format 参数5、重要事项:重复的变更事件6、数据类型映射

本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,还依赖maxwell、kafka和flink的环境。

一、maxwell Format

1、maxwell介绍

Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySQL中的数据变化实时流式传输到Kafka、Kinesis和其他流式连接器中。Maxwell为变更日志提供了统一的格式模式,并支持使用JSON序列化消息。

Flink支持将Maxwell JSON消息解释为INSERT/UPDATE/DELETE Flink SQL系统中的消息。在许多情况下,这对于利用此功能非常有用,例如

将增量数据从数据库同步到其他系统审核日志数据库上的实时物化视图数据库表的临时连接更改历史等等。

Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Maxwell JSON消息,并发送到Kafka等外部系统。但是,截至Flink 1.17版本,Flink无法将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Maxwell消息。

2、binlog设置及验证

设置binlog需要监控的数据库,本示例使用的数据库是mysql5.7

1)、配置

本示例设置的参数参考下面的配置

[root@server4 ~]# cat /etc/my.cnf

# For advice on how to change settings please see

# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]

......

log-bin=mysql-bin # log-bin的名称,可以是任意名称

binlog-format=row # 推荐该参数,其他的参数视情况而定,比如mixed、statement

server_id=1 # mysql集群环境中不要重复

binlog_do_db=test # test是mysql的数据库名称,如果监控多个数据库,可以添加多个binlog_do_db即可,例如下面示例

# binlog_do_db=test2

# binlog_do_db=test3

.....

STATEMENT模式(SBR) 每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题) ROW模式(RBR) 不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。 MIXED模式(MBR) 以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2)、重启mysql

保存配置后重启mysql

service mysqld restart

3)、验证

重启后,可以通过2个简单的方法验证是否设置成功。

mysql默认的安装目录:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql

[root@server4 mysql]# ll

......

-rw-r----- 1 mysql mysql 154 1月 10 2022 mysql-bin.000001

-rw-r----- 1 mysql mysql 1197 1月 16 12:21 mysql-bin.index

.....

查看mysql-bin.000001文件是否生成,且其大小为154字节。mysql-bin.000001是mysql重启的次数,重启2次则为mysql-bin.000002 在test数据库中创建或添加数据,mysql-bin.000001的大小是否增加

以上情况满足,则说明binlog配置正常

3、部署

1)、下载

去其官网:https://maxwells-daemon.io/quickstart/下载需要的版本。 本示例使用的是:maxwell-1.29.2.tar.gz 注意其不同版本对jdk的要求,最新版本要求jdk11.

2)、解压

解压的目录/usr/local/bigdata/maxwell-1.29.2

tar -zvxf maxwell-1.29.2.tar.gz -C /usr/local/bigdata

[alanchan@server3 maxwell-1.29.2]$ ll

总用量 108

drwxr-xr-x 2 alanchan root 4096 1月 16 05:45 bin

-rw-r--r-- 1 alanchan root 25133 1月 24 2021 config.md

-rw-r--r-- 1 alanchan root 11970 1月 24 2021 config.properties.example

-rw-r--r-- 1 alanchan root 10259 4月 22 2020 kinesis-producer-library.properties.example

drwxr-xr-x 3 alanchan root 12288 1月 27 2021 lib

-rw-r--r-- 1 alanchan root 548 4月 22 2020 LICENSE

-rw-r--r-- 1 alanchan root 470 1月 24 2021 log4j2.xml

-rw-r--r-- 1 alanchan root 3328 1月 27 2021 quickstart.md

-rw-r--r-- 1 alanchan root 1429 1月 27 2021 README.md

3)、创建元数据库

该步骤需要创建一个mysql数据库,用以保存maxwell的元数据,至于访问这个数据库的用户名和密码则视情况而定,下面的内容是其官方上的操作,也就是创建用户、授权。

本文的示例中使用的是root用户,创建的数据库名称为maxwell。

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';

mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

4)、启动方式

其提供了2种启动方式,即通过命令行参数的形式和通过配置文件的形式,下面是给出的示例

命令行参数形式,输出到控制台 不需要做任何配置即可直接使用,

maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout

# user 和 password 是连接mysql元数据库的账号和密码

# host是被监控的mysql的ip

# producer是maxwell的输出类型,比如stdout、kafka等

配置文件方式,输出到控制台

maxwell --config ../config.properties

# config.properties文件修改内容如下,其他的保持不变,也可以根据自己的需要修改

producer=stdout

# mysql login info

host=192.168.10.44

user=root

password=123456

4、示例1:maxwell CDC 输出至控制台

1)、启动maxwell

部署完成后,不需要做任何的改动即可执行下面的命令

maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout

2)、操作mysql监控的数据库,观察其控制台输出

在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下

[alanchan@server3 bin]$ maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout

Using kafka version: 1.0.0

{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653290,"xid":20392,"commit":true,"data":{"name":"alanchanchn","scores":109.0},"old":{"scores":199.0}}

{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705653456,"xid":20935,"commit":true,"data":{"name":"alan1","scores":5.0}}

{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653461,"xid":20951,"commit":true,"data":{"name":"alan1","scores":109.0},"old":{"scores":5.0}}

{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705653465,"xid":20967,"commit":true,"data":{"name":"alan1","scores":109.0}}

5、示例2:maxwell CDC 输出至kafka

1)、启动maxwell

部署完成后,不需要做任何的改动即可执行下面的命令

maxwell --user='root' --password='rootroot' --host='192.168.10.44' --producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic

[alanchan@server3 bin]$ maxwell --user='root' --password='rootroot' --host='192.168.10.44' --producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic

Using kafka version: 1.0.0

2)、通过命令行打开kafka消费者

kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning

3)、操作mysql监控的数据库,观察其控制台输出

在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下

[alanchan@server1 bin]$ cd ../../kafka_2.12-3.0.0/bin/

[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning

{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705654206,"xid":22158,"commit":true,"data":{"name":"test","scores":100.0}}

{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}

{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705654224,"xid":22210,"commit":true,"data":{"name":"test","scores":200.0}}

二、Flink 与 maxwell 实践

为了使用maxwell格式,使用构建自动化工具(如Maven或SBT)的项目和带有SQL JAR包的SQLClient都需要以下依赖项。

1、maven依赖

该依赖在flink自建工程中已经包含。

org.apache.flink

flink-json

1.17.1

有关如何部署 maxwell 以将变更日志同步到消息队列,请参阅上文的具体事例或想了解更多的信息参考maxwell 文档。

2、Flink sql client 建表示例

maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL 库 userscoressink表中捕获更新操作的简单示例:

{

"database": "cdctest",

"table": "userscoressink",

"type": "update",

"ts": 1705654220,

"xid": 22196,

"commit": true,

"data": {

"name": "test",

"scores": 200.0

},

"old": {

"scores": 100.0

}

}

MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一个更新事件,表示数据上scores字段值从100变更成为200。

消息已经同步到了一个 Kafka 主题:alan_maxwell_to_kafka_topic,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

具体启动maxwell参考本文的第一部分的kafka示例,其他不再赘述。下面的部分仅仅是演示maxwell环境都正常后,在Flink SQL client中的操作。

-- 元数据与 MySQL "userscoressink" 表完全相同

CREATE TABLE userscoressink (

name STRING,

scores FLOAT

) WITH (

'connector' = 'kafka',

'topic' = 'alan_maxwell_to_kafka_topic',

'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'maxwell-json' -- 使用 maxwell-json 格式

);

将 Kafka 主题注册成 Flink 表之后,就可以将 maxwell消息用作变更日志源。

-- 验证,在mysql中新增、修改和删除数据,观察flink sql client 的数据变化

Flink SQL> show tables;

Empty set

Flink SQL> CREATE TABLE userscoressink (

> name STRING,

> scores FLOAT

> ) WITH (

> 'connector' = 'kafka',

> 'topic' = 'alan_maxwell_to_kafka_topic',

> 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',

> 'properties.group.id' = 'testGroup',

> 'scan.startup.mode' = 'earliest-offset',

> 'format' = 'maxwell-json' -- 使用 maxwell-json 格式

> );

[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink;

+----+--------------------------------+--------------------------------+

| op | name | scores |

+----+--------------------------------+--------------------------------+

| +I | test | 100.0 |

| -U | test | 100.0 |

| +U | test | 200.0 |

| -D | test | 200.0 |

Query terminated, received a total of 4 rows

-- 关于MySQL "userscoressink" 表的实时物化视图

-- 按name分组,对scores进行求和

Flink SQL> select name ,sum(scores) sum_scores from userscoressink group by name;

+----+--------------------------------+--------------------------------+

| op | name | sum_scores |

+----+--------------------------------+--------------------------------+

| +I | test | 100.0 |

| -D | test | 100.0 |

| +I | test | 200.0 |

| -D | test | 200.0 |

3、Available Metadata

以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。

只有当相应的连接器转发格式元数据时,格式元数据字段才可用。

截至Flink 1.17版本,只有Kafka连接器能够公开其值格式的元数据字段。

以下示例显示了如何访问Kafka中的Maxwell元数据字段:

CREATE TABLE userscoressink2(

origin_database STRING METADATA FROM 'value.database' VIRTUAL,

origin_table STRING METADATA FROM 'value.table' VIRTUAL,

origin_primary_key_columns ARRAY METADATA FROM 'value.primary-key-columns' VIRTUAL,

origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,

name STRING,

scores FLOAT

) WITH (

'connector' = 'kafka',

'topic' = 'alan_maxwell_to_kafka_topic',

'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'earliest-offset',

'format' = 'maxwell-json'

);

# 操作步骤如下

Flink SQL> CREATE TABLE userscoressink2(

> origin_database STRING METADATA FROM 'value.database' VIRTUAL,

> origin_table STRING METADATA FROM 'value.table' VIRTUAL,

> origin_primary_key_columns ARRAY METADATA FROM 'value.primary-key-columns' VIRTUAL,

> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,

> name STRING,

> scores FLOAT

> ) WITH (

> 'connector' = 'kafka',

> 'topic' = 'alan_maxwell_to_kafka_topic',

> 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',

> 'properties.group.id' = 'testGroup',

> 'scan.startup.mode' = 'earliest-offset',

> 'format' = 'maxwell-json'

> );

[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink2;

+----+----------------+---------------+---------------------------+-------------------------+-----+-------+

| op |origin_database | origin_table |origin_primary_key_columns | origin_ts |name |scores |

+----+----------------+---------------+---------------------------+-------------------------+-----+-------+

| +I | cdctest |userscoressink | | 2024-01-19 16:50:06.000 |test | 100.0 |

| -U | cdctest |userscoressink | | 2024-01-19 16:50:20.000 |test | 100.0 |

| +U | cdctest |userscoressink | | 2024-01-19 16:50:20.000 |test | 200.0 |

| -D | cdctest |userscoressink | | 2024-01-19 16:50:24.000 |test | 200.0 |

4、Format 参数

5、重要事项:重复的变更事件

Maxwell应用程序允许每次更改事件只投递一次。在这种情况下,Flink在消费Maxwell生产的事件时效果非常好。如果Maxwell应用程序至少在一次交付中工作,它可能会向Kafka交付重复的更改事件,Flink将获得重复的事件。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置表.exec.source.cdc-events-duplicate设置为true,并在源上定义PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来消除更改事件的重复,并生成一个规范化的更改日志流。

6、数据类型映射

目前,maxwell Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档。

以上,本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: