目录

一、总体架构

二、安装配置 MySQL

1. 创建 mysql 用户

2. 建立 MySQL 使用的目录

3. 解压安装包

4. 配置环境变量

5. 创建 MySQL 配置文件

6. MySQL 系统初始化

7. 启动 mysql 服务器

8. 创建 dba 用户

三、配置 MySQL 主从复制

四、安装部署 Kafka Connector

1. 创建插件目录

2. 解压文件到插件目录

3. 配置 Kafka Connector

(1)配置属性文件

(2)分发到其它节点

(3)以 distributed 方式启动

(4)确认 connector 插件和自动生成的 topic

4. 创建 source connector

(1)创建源 mysql 配置文件

(2)创建 mysql source connector

5. 创建 sink connector

(1)创建目标 hbase 配置文件

(2)创建 hbase sink connector

6. 存量数据自动同步

7. 实时数据同步测试

参考:

        本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:

JDK:11.0.22MySQL:8.0.16HBase:2.5.7debezium-connector-mysql:2.4.2kafka-connect-hbase:2.0.13

一、总体架构

        总体结构如下图所示。

        下表描述了四个节点上分别将会运行的相关进程。简便起见,安装部署过程中所用的命令都使用操作系统的 root 用户执行。

                                             节点 进程 node1 node2 node3 node4 debezium-connector-mysql * * * kafka-connect-hbase * * *

        另外在 172.18.16.156 上安装 MySQL,并启动两个实例做主从复制,主库实例用3306端口,从库实例用3307端口。

        所需安装包:

mysql-8.0.16-linux-glibc2.12-x86_64.tar.xzdebezium-debezium-connector-mysql-2.4.2.zipconfluentinc-kafka-connect-hbase-2.0.13.zip

        这里使用的 debezium connector 版本需要 JDK 11 以上支持。在安装了多个 JDK 版本的环境中,可以使用 alternatives 命令选择需要的版本:

[root@vvgg-z2-music-mysqld~]#alternatives --config java

共有 5 个程序提供“java”。

  选择    命令

-----------------------------------------------

   1           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   2           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

 + 3           /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/bin/java

   4           /usr/lib/jvm/jre-1.5.0-gcj/bin/java

*  5           /usr/lib/jvm/jdk-11-oracle-x64/bin/java

按 Enter 来保存当前选择[+],或键入选择号码:5

[root@vvgg-z2-music-mysqld~]#java -version

java version "11.0.22" 2024-01-16 LTS

Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)

Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)

[root@vvgg-z2-music-mysqld~]#

        在 172.18.16.156 的 /etc/hosts 文件中加入 Kafka 集群主机名:

# 编辑文件

vim /etc/hosts

        添加以下内容:

172.18.4.126    node1

172.18.4.188    node2

172.18.4.71    node3

172.18.4.86    node4

二、安装配置 MySQL

        安装配置 MySQL 一主一从双实例。

1. 创建 mysql 用户

# root 用于执行

useradd mysql

passwd mysql

2. 建立 MySQL 使用的目录

# 创建数据目录,确保数据目录 mysqldata 为空

mkdir -p /data/3306/mysqldata

# 创建 binlog 目录

mkdir -p /data/3306/dblog

# 创建临时目录

mkdir -p /data/3306/tmp

# 修改目录属主为 mysql

chown -R mysql:mysql /data

# 使用 mysql 用户执行下面的安装过程

su - mysql

3. 解压安装包

# 进入安装目录

cd ~

# 从tar包中把提取文件

tar xvf mysql-8.0.16-linux-glibc2.12-x86_64.tar.xz

# 建立软连接

ln -s mysql-8.0.16-linux-glibc2.12-x86_64 mysql-8.0.16

4. 配置环境变量

# 将 MySQL 可执行文件所在目录添加到 $PATH 环境变量中

# 编辑文件

vim ~/.bash_profile

# 修改或添加如下两行

PATH=$PATH:$HOME/.local/bin:$HOME/bin:/home/mysql/mysql-8.0.16/bin

export PATH

# 使配置生效

source ~/.bash_profile

5. 创建 MySQL 配置文件

# 编辑文件

vim /home/mysql/my_3306.cnf

        文件内容如下:

[mysqld]

max_allowed_packet=1G

log_timestamps=system

binlog_transaction_dependency_tracking  = WRITESET

transaction_write_set_extraction        = XXHASH64

binlog_expire_logs_seconds=259200

lower_case_table_names=1

secure_file_priv=''

log_bin_trust_function_creators=on

character-set-server = utf8mb4

default_authentication_plugin=mysql_native_password

basedir=/home/mysql/mysql-8.0.16-linux-glibc2.12-x86_64

datadir=/data/3306/mysqldata

socket=/data/3306/mysqldata/mysql.sock

wait_timeout=30

innodb_buffer_pool_size = 16G

max_connections = 1000

default-time-zone = '+8:00'

port = 3306

skip-name-resolve 

user=mysql

innodb_print_all_deadlocks=1

log_output='table'

slow_query_log = 1

long_query_time = 1

tmp_table_size = 32M

# 开启 binlog

log-bin=/data/3306/dblog/mysql-bin

log-bin-index = /data/3306/dblog/mysql-bin.index 

tmpdir = /data/3306/tmp

server-id = 1563306

innodb_data_file_path = ibdata1:1G:autoextend

innodb_data_home_dir = /data/3306/mysqldata

innodb_log_buffer_size = 16M

innodb_log_file_size = 1G

innodb_log_files_in_group = 3

innodb_log_group_home_dir=/data/3306/dblog

innodb_max_dirty_pages_pct = 90

innodb_lock_wait_timeout = 120

gtid-mode = on

enforce_gtid_consistency=true

local_infile=0

log_error='/data/3306/mysqldata/master.err'

skip_symbolic_links=yes

[mysqldump]

quick

max_allowed_packet = 1G

[mysqld_safe]

open-files-limit = 8192

6. MySQL 系统初始化

mysqld --defaults-file=/home/mysql/my_3306.cnf --initialize

7. 启动 mysql 服务器

mysqld_safe --defaults-file=/home/mysql/my_3306.cnf &

8. 创建 dba 用户

# 连接 mysql 服务器

mysql -u root -p -S /data/3306/mysqldata/mysql.sock

-- 修改 root 用户密码

alter user user() identified by "123456";

-- 创建一个新的 dba 账号

create user 'dba'@'%' identified with mysql_native_password by '123456';

grant all on *.* to 'dba'@'%' with grant option;

        重复执行 2 - 8 步,将 3306 换成 3307,创建从库实例。

三、配置 MySQL 主从复制

        3306 主库实例执行:

-- 查看复制位点

show master status;

-- 创建复制用户并授权

create user 'repl'@'%' identified with mysql_native_password by '123456';

grant replication client,replication slave on *.* to 'repl'@'%';

-- 创建测试库表及数据

create database test;

create table test.t1 (

  id bigint(20) not null auto_increment,

  remark varchar(32) default null comment '备注',

  createtime timestamp not null default current_timestamp comment '创建时间',

  primary key (id));

insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');

        输出:

mysql> show master status;

+------------------+----------+--------------+------------------+------------------------------------------+

| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                        |

+------------------+----------+--------------+------------------+------------------------------------------+

| mysql-bin.000001 |      977 |              |                  | ba615057-e11c-11ee-b80e-246e961c91f8:1-3 |

+------------------+----------+--------------+------------------+------------------------------------------+

1 row in set (0.00 sec)

mysql> create user 'repl'@'%' identified with mysql_native_password by '123456';

Query OK, 0 rows affected (0.01 sec)

mysql> grant replication client,replication slave on *.* to 'repl'@'%';

Query OK, 0 rows affected (0.00 sec)

mysql> create database test;

Query OK, 1 row affected (0.00 sec)

mysql> create table test.t1 (

    ->   id bigint(20) not null auto_increment,

    ->   remark varchar(32) default null comment '备注',

    ->   createtime timestamp not null default current_timestamp comment '创建时间',

    ->   primary key (id));

Query OK, 0 rows affected (0.01 sec)

mysql> insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');

Query OK, 3 rows affected (0.00 sec)

Records: 3  Duplicates: 0  Warnings: 0

        3307 从库实例执行:

change master to

master_host='172.18.16.156',

master_port=3306,

master_user='repl',

master_password='123456',

master_log_file='mysql-bin.000001',

master_log_pos=977;

start slave;

show slave status\G

select user,host from mysql.user;

select * from test.t1;

输出:

mysql> change master to

    -> master_host='172.18.16.156',

    -> master_port=3306,

    -> master_user='repl',

    -> master_password='123456',

    -> master_log_file='mysql-bin.000001',

    -> master_log_pos=977;

Query OK, 0 rows affected, 2 warnings (0.00 sec)

mysql> start slave;

Query OK, 0 rows affected (0.01 sec)

mysql> show slave status\G

*************************** 1. row ***************************

               Slave_IO_State: Waiting for master to send event

                  Master_Host: 172.18.16.156

                  Master_User: repl

                  Master_Port: 3306

                Connect_Retry: 60

              Master_Log_File: mysql-bin.000001

          Read_Master_Log_Pos: 2431

               Relay_Log_File: vvgg-z2-music-mysqld-relay-bin.000002

                Relay_Log_Pos: 1776

        Relay_Master_Log_File: mysql-bin.000001

             Slave_IO_Running: Yes

            Slave_SQL_Running: Yes

              Replicate_Do_DB: 

          Replicate_Ignore_DB: 

           Replicate_Do_Table: 

       Replicate_Ignore_Table: 

      Replicate_Wild_Do_Table: 

  Replicate_Wild_Ignore_Table: 

                   Last_Errno: 0

                   Last_Error: 

                 Skip_Counter: 0

          Exec_Master_Log_Pos: 2431

              Relay_Log_Space: 1999

              Until_Condition: None

               Until_Log_File: 

                Until_Log_Pos: 0

           Master_SSL_Allowed: No

           Master_SSL_CA_File: 

           Master_SSL_CA_Path: 

              Master_SSL_Cert: 

            Master_SSL_Cipher: 

               Master_SSL_Key: 

        Seconds_Behind_Master: 0

Master_SSL_Verify_Server_Cert: No

                Last_IO_Errno: 0

                Last_IO_Error: 

               Last_SQL_Errno: 0

               Last_SQL_Error: 

  Replicate_Ignore_Server_Ids: 

             Master_Server_Id: 1563306

                  Master_UUID: ba615057-e11c-11ee-b80e-246e961c91f8

             Master_Info_File: mysql.slave_master_info

                    SQL_Delay: 0

          SQL_Remaining_Delay: NULL

      Slave_SQL_Running_State: Slave has read all relay log; waiting for more updates

           Master_Retry_Count: 86400

                  Master_Bind: 

      Last_IO_Error_Timestamp: 

     Last_SQL_Error_Timestamp: 

               Master_SSL_Crl: 

           Master_SSL_Crlpath: 

           Retrieved_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8

            Executed_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8,

c2df1946-e11c-11ee-8026-246e961c91f8:1-3

                Auto_Position: 0

         Replicate_Rewrite_DB: 

                 Channel_Name: 

           Master_TLS_Version: 

       Master_public_key_path: 

        Get_master_public_key: 0

            Network_Namespace: 

1 row in set (0.00 sec)

mysql> select user,host from mysql.user;

+------------------+-----------+

| user             | host      |

+------------------+-----------+

| dba              | %         |

| repl             | %         |

| mysql.infoschema | localhost |

| mysql.session    | localhost |

| mysql.sys        | localhost |

| root             | localhost |

+------------------+-----------+

6 rows in set (0.00 sec)

mysql> select * from test.t1;

+----+------------------+---------------------+

| id | remark           | createtime          |

+----+------------------+---------------------+

|  1 | 第一行:row1     | 2024-03-20 10:25:32 |

|  2 | 第二行:row2     | 2024-03-20 10:25:32 |

|  3 | 第三行:row3     | 2024-03-20 10:25:32 |

+----+------------------+---------------------+

3 rows in set (0.00 sec)

        MySQL主从复制相关配置参见“配置异步复制”。

四、安装部署 Kafka Connector

        在 node2 上执行以下步骤。

1. 创建插件目录

mkdir $KAFKA_HOME/plugins

2. 解压文件到插件目录

# debezium-connector-mysql

unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/

# kafka-connect-hbase

unzip confluentinc-kafka-connect-hbase-2.0.13.zip -d $KAFKA_HOME/plugins/

3. 配置 Kafka Connector

(1)配置属性文件

# 编辑 connect-distributed.properties 文件

vim $KAFKA_HOME/config/connect-distributed.properties

        内容如下:

bootstrap.servers=node2:9092,node3:9092,node4:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

offset.storage.topic=connect-offsets

offset.storage.replication.factor=3

offset.storage.partitions=3

config.storage.topic=connect-configs

config.storage.replication.factor=3

status.storage.topic=connect-status

status.storage.replication.factor=3

status.storage.partitions=3

offset.flush.interval.ms=10000

plugin.path=/root/kafka_2.13-3.7.0/plugins

(2)分发到其它节点

scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/

scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/

scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/

scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/

(3)以 distributed 方式启动

        三台都执行,在三个节点上各启动一个 worker 进程,用以容错和负载均衡。

connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties 

# 确认日志是否有 ERROR

grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out

(4)确认 connector 插件和自动生成的 topic

        查看连接器插件:

curl -X GET http://node2:8083/connector-plugins | jq

        从输出中可以看到,Kafka connect 已经识别到了 hbase sink 和 mysql source 插件:

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   494  100   494    0     0   4111      0 --:--:-- --:--:-- --:--:--  4116

[

  {

    "class": "io.confluent.connect.hbase.HBaseSinkConnector",

    "type": "sink",

    "version": "2.0.13"

  },

  {

    "class": "io.debezium.connector.mysql.MySqlConnector",

    "type": "source",

    "version": "2.4.2.Final"

  },

  {

    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",

    "type": "source",

    "version": "3.7.0"

  },

  {

    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",

    "type": "source",

    "version": "3.7.0"

  },

  {

    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",

    "type": "source",

    "version": "3.7.0"

  }

]

[root@vvml-yz-hbase-test~]#

        查看 topic:

kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:

[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

__consumer_offsets

connect-configs

connect-offsets

connect-status

test-1

test-3

[root@vvml-yz-hbase-test~]#

4. 创建 source connector

(1)创建源 mysql 配置文件

# 编辑文件

vim $KAFKA_HOME/plugins/source-mysql.json

        内容如下:

{

 "name": "mysql-source-connector",

 "config": {

     "connector.class": "io.debezium.connector.mysql.MySqlConnector",

     "tasks.max": "1",

     "topic.prefix": "mysql-hbase-test",

     "database.hostname": "172.18.16.156",

     "database.port": "3307",

     "database.user": "dba",

     "database.password": "123456",

     "database.server.id": "1563307",

     "database.server.name": "dbserver1",

     "database.include.list": "test",

     "schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",

     "schema.history.internal.kafka.topic": "schemahistory.mysql-hbase-test"

     }

 }

(2)创建 mysql source connector

# 创建 connector

curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"

# 查看 connector 状态

curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq

# 查看 topic

kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:

[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"

HTTP/1.1 201 Created

Date: Wed, 20 Mar 2024 02:31:30 GMT

Location: http://node2:8083/connectors/mysql-source-connector

Content-Type: application/json

Content-Length: 579

Server: Jetty(9.4.53.v20231009)

{"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","topic.prefix":"mysql-hbase-test","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-hbase-test","name":"mysql-source-connector"},"tasks":[],"type":"source"}

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   182  100   182    0     0  20726      0 --:--:-- --:--:-- --:--:-- 22750

{

  "name": "mysql-source-connector",

  "connector": {

    "state": "RUNNING",

    "worker_id": "172.18.4.188:8083"

  },

  "tasks": [

    {

      "id": 0,

      "state": "RUNNING",

      "worker_id": "172.18.4.188:8083"

    }

  ],

  "type": "source"

}

[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

__consumer_offsets

connect-configs

connect-offsets

connect-status

mysql-hbase-test

mysql-hbase-test.test.t1

schemahistory.mysql-hbase-test

test-1

test-3

[root@vvml-yz-hbase-test~]#

5. 创建 sink connector

(1)创建目标 hbase 配置文件

# 编辑文件

vim $KAFKA_HOME/plugins/sink-hbase.json

        内容如下:

{

  "name": "hbase-sink-connector",

  "config": {

    "topics": "mysql-hbase-test.test.t1",

    "tasks.max": "1",

    "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector",

    "key.converter":"org.apache.kafka.connect.storage.StringConverter",

    "value.converter":"org.apache.kafka.connect.storage.StringConverter",

    "confluent.topic.bootstrap.servers": "node2:9092,node3:9092,node4:9092",

    "confluent.topic.replication.factor":3,

    "hbase.zookeeper.quorum": "node2,node3,node4",

    "hbase.zookeeper.property.clientPort": "2181",

    "auto.create.tables": "true",

    "auto.create.column.families": "true",

    "table.name.format": "example_table"

  }

}

(2)创建 hbase sink connector

# 创建 connector

curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"

# 查看 connector 状态

curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq

# 查看 consumer group

kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,hbase-sink-connector 状态为 RUNNING,并自动创建了一个消费者组:

[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"

HTTP/1.1 201 Created

Date: Wed, 20 Mar 2024 02:33:11 GMT

Location: http://node2:8083/connectors/hbase-sink-connector

Content-Type: application/json

Content-Length: 654

Server: Jetty(9.4.53.v20231009)

{"name":"hbase-sink-connector","config":{"topics":"mysql-hbase-test.test.t1","tasks.max":"1","connector.class":"io.confluent.connect.hbase.HBaseSinkConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","confluent.topic.bootstrap.servers":"node2:9092,node3:9092,node4:9092","confluent.topic.replication.factor":"3","hbase.zookeeper.quorum":"node2,node3,node4","hbase.zookeeper.property.clientPort":"2181","auto.create.tables":"true","auto.create.column.families":"true","table.name.format":"example_table","name":"hbase-sink-connector"},"tasks":[],"type":"sink"}

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   176  100   176    0     0  23084      0 --:--:-- --:--:-- --:--:-- 25142

{

  "name": "hbase-sink-connector",

  "connector": {

    "state": "RUNNING",

    "worker_id": "172.18.4.71:8083"

  },

  "tasks": [

    {

      "id": 0,

      "state": "RUNNING",

      "worker_id": "172.18.4.71:8083"

    }

  ],

  "type": "sink"

}

[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

connect-hbase-sink-connector

[root@vvml-yz-hbase-test~]#

6. 存量数据自动同步

        sink connector 自动在 hbase 中创建了 example_table 表,并且自动同步了前面配置 MySQL 主从复制时添加的三条测试数据:

[root@vvml-yz-hbase-test~]#hbase shell

HBase Shell

Use "help" to get list of supported commands.

Use "exit" to quit this interactive shell.

For Reference, please visit: http://hbase.apache.org/2.0/book.html#shell

Version 2.5.7-hadoop3, r6788f98356dd70b4a7ff766ea7a8298e022e7b95, Thu Dec 14 16:16:10 PST 2023

Took 0.0012 seconds                                                                                                                       

hbase:001:0> list

TABLE                                                                                                                                     

SYSTEM:CATALOG                                                                                                                            

SYSTEM:CHILD_LINK                                                                                                                         

SYSTEM:FUNCTION                                                                                                                           

SYSTEM:LOG                                                                                                                                

SYSTEM:MUTEX                                                                                                                              

SYSTEM:SEQUENCE                                                                                                                           

SYSTEM:STATS                                                                                                                              

SYSTEM:TASK                                                                                                                               

example_table                                                                                                                             

test                                                                                                                                      

10 row(s)

Took 0.3686 seconds                                                                                                                       

=> ["SYSTEM:CATALOG", "SYSTEM:CHILD_LINK", "SYSTEM:FUNCTION", "SYSTEM:LOG", "SYSTEM:MUTEX", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:TASK", "example_table", "test"]

hbase:002:0> describe 'example_table'

Table example_table is ENABLED                                                                                                            

example_table, {TABLE_ATTRIBUTES => {METADATA => {'hbase.store.file-tracker.impl' => 'DEFAULT'}}}                                         

COLUMN FAMILIES DESCRIPTION                                                                                                               

{NAME => 'mysql-hbase-test.test.t1', INDEX_BLOCK_ENCODING => 'NONE', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =

> 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', COMPRESSION => 'NON

E', BLOCKCACHE => 'true', BLOCKSIZE => '65536 B (64KB)'}                                                                                  

1 row(s)

Quota is disabled

Took 0.1173 seconds                                                                                                                       

hbase:003:0> scan 'example_table',{FORMATTER=>'toString'}

ROW                                 COLUMN+CELL                                                                                           

 {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"

                                    after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test"

                                    ,"sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"

                                    thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         

 {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"

                                    after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test",

                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t

                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          

 {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"

                                    after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test",

                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t

                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          

3 row(s)

Took 0.0702 seconds                                                                                                                       

hbase:004:0> 

        debezium-connector-mysql 默认会在启动时将存量数据写到 Kafka 中,这使得在构建实时数仓时,可以做到存量数据与增量数据一步实时同步,极大方便了 CDC(Change Data Capture,变化数据捕获) 过程。

7. 实时数据同步测试

        MySQL 主库数据变更:

insert into test.t1 (remark) values ('第四行:row4');

update test.t1 set remark = '第五行:row5' where id = 4;

delete from test.t1 where id =1;

        Hbase 查看数据变化:

hbase:004:0> scan 'example_table',{FORMATTER=>'toString'}

ROW                                 COLUMN+CELL                                                                                           

 {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"

                                    after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test"

                                    ,"sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"

                                    thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         

 {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"

                                    after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test",

                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t

                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          

 {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"

                                    after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test",

                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t

                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          

 {"id":4}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:38:18.788, value={"before":null,"

                                    after":{"id":4,"remark":"第四行:row4","createtime":"2024-03-20T02:38:18Z"},"source":{"version":"2.4.2.Fin

                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710902298000,"snapshot":"false","db":"test"

                                    ,"sequence":null,"table":"t1","server_id":1563306,"gtid":"ba615057-e11c-11ee-b80e-246e961c91f8:9","fil

                                    e":"mysql-bin.000001","pos":2679,"row":0,"thread":49,"query":null},"op":"c","ts_ms":1710902298665,"tra

                                    nsaction":null}                                                                                       

4 row(s)

Took 0.0091 seconds                                                                                                                       

hbase:005:0> 

        MySQL 执行的 delete、update 操作没有同步到 Hbase。

        查看消费情况:

[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --group connect-hbase-sink-connector --describe --bootstrap-server node2:9092,node3:9092,node4:9092

Consumer group 'connect-hbase-sink-connector' has no active members.

GROUP                        TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

connect-hbase-sink-connector mysql-hbase-test.test.t1 0          3               7               4               -               -               -

[root@vvml-yz-hbase-test~]#

        数据变更都写入了 Kafka,但没有都消费。

        查看 sink connector 状态:

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100  2168  100  2168    0     0   368k      0 --:--:-- --:--:-- --:--:--  423k

{

  "name": "hbase-sink-connector",

  "connector": {

    "state": "RUNNING",

    "worker_id": "172.18.4.71:8083"

  },

  "tasks": [

    {

      "id": 0,

      "state": "FAILED",

      "worker_id": "172.18.4.71:8083",

      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: \n\tat io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)\n\tat io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)\n\tat io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)\n\tat io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)\n\tat io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.\n\t... 16 more\n"

    }

  ],

  "type": "sink"

}

[root@vvml-yz-hbase-test~]#

        查看 node3 上的日志文件 ~/kafka_2.13-3.7.0/logs/connectDistributed.out,错误信息如下:

[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table:  (org.apache.kafka.connect.runtime.WorkerSinkTask:630)

org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: 

    at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)

    at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)

    at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)

    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)

    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)

    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)

    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)

    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)

    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.

    ... 16 more

[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.

    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)

    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)

    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)

    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)

    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

    at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: 

    at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)

    at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)

    at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)

    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)

    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

    ... 11 more

Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.

    ... 16 more

        可以看到报错为:Row with specified row key already exists.

        原因是 sink connector 将 MySQL 的 update、delete 都转化为 Hbase 数据插入,但自动识别的 rowkey 为 MySQL 表的主键,而该 rowkey 已经存在,所以插入报错了。这种同步行为需要注意。

参考:

Greenplum 实时数据仓库实践(5)——实时数据同步Debezium MySQL Source Connector for Confluent PlatformApache HBase Sink Connector for Confluent Platform

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: