1. 背景

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。

2. 同步原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流),转换为json格式 Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES。

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

3. 环境信息

名称版本MySQL8.0.35elasticsearch7.17.9canal1.1.7jdk1.8

本文以MySQL中的表ibds2.proof_data_history_202311为例进行数据同步实验。

4. 部署配置

本文的的重点是讲述canal的配置与使用,不再详细描述MySQL与ES的部署和配置。

4.1. MySQL

MySQL的部署与配置相对较简单,需要主要的是需要开启binlog,且配置binlog-format 为ROW模式。此处不再做详细描述。

4.2. Elasticsearch

ES的部署与配置也比较简单,此处不再做详细描述。

建议安装同版本的分词器插件。

4.3. canal

版本下载地址:

deployer:

https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

adapter:

https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz

4.3.1. deployer

4.3.1.1. 配置

将canal.deployer-1.1.7.tar.gz上传到/data/canal/Server,并进行解压:

cd /data/canal/Server

tar -zxvf canal.deployer-1.1.7.tar.gz

修改配置文件conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置:

## mysql serverId , v1.0.26+ will autoGen 不要与MySQL的id重复

canal.instance.mysql.slaveId=10

# enable gtid use true/false 是否使用gtid

canal.instance.gtidon=true

# position info MySQL数据库信息

canal.instance.master.address=192.168.10.101:3307

# username/password

canal.instance.dbUsername=root

canal.instance.dbPassword=X87_5w2Anxp3

canal.instance.connectionCharset = UTF-8

# table regex 需要订阅binlog的表的过滤正则表达式

canal.instance.filter.regex=.*\\..*

4.3.1.2. 启动

进入bin目录,启动deployer:

cd /data/canal/Server/bin

./startup.sh

4.3.2. adapter

在adapter的配置中,一定要先在ES中创建索引,然后在启动adapter。

4.3.2.1. 配置

将canal.adapter-1.1.7.tar.gz上传到/data/canal/Adapter,并进行解压:

cd /data/canal/Adapter

tar -zxvf canal.adapter-1.1.7.tar.gz

修改配置文件conf/application.yml,按如下配置即可,主要是修改canal-server配置、数据源配置和客户端适配器配置:

server:

port: 8081

spring:

jackson:

  date-format: yyyy-MM-dd HH:mm:ss

  time-zone: GMT+8

  default-property-inclusion: non_null

canal.conf:

mode: tcp #tcp kafka rocketMQ rabbitMQ

flatMessage: true

zookeeperHosts:

syncBatchSize: 1000

retries: -1

timeout:

accessKey:

secretKey:

consumerProperties:

   # canal tcp consumer

  canal.tcp.server.host: 192.168.10.101:11111 # 需要修改

  canal.tcp.zookeeper.hosts:

  canal.tcp.batch.size: 500

  canal.tcp.username:

  canal.tcp.password:

   # kafka consumer

  kafka.bootstrap.servers: 127.0.0.1:9092

  kafka.enable.auto.commit: false

  kafka.auto.commit.interval.ms: 1000

  kafka.auto.offset.reset: latest

  kafka.request.timeout.ms: 40000

  kafka.session.timeout.ms: 30000

  kafka.isolation.level: read_committed

  kafka.max.poll.records: 1000

   # rocketMQ consumer

  rocketmq.namespace:

  rocketmq.namesrv.addr: 127.0.0.1:9876

  rocketmq.batch.size: 1000

  rocketmq.enable.message.trace: false

  rocketmq.customized.trace.topic:

  rocketmq.access.channel:

  rocketmq.subscribe.filter:

   # rabbitMQ consumer

  rabbitmq.host:

  rabbitmq.virtual.host:

  rabbitmq.username:

  rabbitmq.password:

  rabbitmq.resource.ownerId:

srcDataSources:

  defaultDS:

    url: jdbc:mysql://192.168.10.101:3307/ibds2?useUnicode=true # 需要修改

    username: root # 需要修改

    password: X87_5w2Anxp3 # 需要修改

canalAdapters:

- instance: example # canal instance Name or mq topic name

  groups:

  - groupId: g1

    outerAdapters:

    - name: logger

#     - name: rdb

#       key: mysql1

#       properties:

#         jdbc.driverClassName: com.mysql.jdbc.Driver

#         jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true

#         jdbc.username: root

#         jdbc.password: 121212

#         druid.stat.enable: false

#         druid.stat.slowSqlMillis: 1000

#     - name: rdb

#       key: oracle1

#       properties:

#         jdbc.driverClassName: oracle.jdbc.OracleDriver

#         jdbc.url: jdbc:oracle:thin:@localhost:49161:XE

#         jdbc.username: mytest

#         jdbc.password: m121212

#     - name: rdb

#       key: postgres1

#       properties:

#         jdbc.driverClassName: org.postgresql.Driver

#         jdbc.url: jdbc:postgresql://localhost:5432/postgres

#         jdbc.username: postgres

#         jdbc.password: 121212

#         threads: 1

#         commitSize: 3000

#     - name: hbase

#       properties:

#         hbase.zookeeper.quorum: 127.0.0.1

#         hbase.zookeeper.property.clientPort: 2181

#         zookeeper.znode.parent: /hbase

    - name: es7 # 需要修改

      hosts: 192.168.10.101:9300 # 127.0.0.1:9200 for rest mode # 需要修改

      properties:

        mode: transport # or rest

         # security.auth: test:123456 # only used for rest mode

        cluster.name: my-es # 需要修改

#     - name: kudu

#       key: kudu

#       properties:

#         kudu.master.address: 127.0.0.1 # ',' split multi address

#     - name: phoenix

#       key: phoenix

#       properties:

#         jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver

#         jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db

#         jdbc.username:

#         jdbc.password:

配置重点

jdbc:mysql://192.168.10.101:3307/ibds2?useUnicode=true中的数据库名称 cluster.name:es集群名称根据自己的实际的配置,我的是my-es -name: es7 这个很重要一会儿要用

添加配置文件canal-adapter/conf/es7/proof_202311.yml,用于配置MySQL中的表与Elasticsearch中索引的映射关系:

dataSourceKey: defaultDS # 与application.yml中的srcDataSources对应

destination: example # canal的instance

groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据

esMapping:

_index: proof_data_history_202311 # 要在es中创建的索引名称

_type: _doc

_id: id # 与sql中的id一致

upsert: true

sql: "SELECT

      t.id as id,

      t.biz_id as biz_id,

      t.biz_type as biz_type,

      t.encrypt_mode as encrypt_mode,

      t.sign_user as sign_user,

      t.tx_hash as tx_hash,

      t.from_addr as from_addr,

      t.data as data,

      t.channel_id as channel_id,

      t.block_height as block_height,

      t.tx_time as tx_time

      FROM

      proof_data_history_202311 t"

commitBatch: 3000

4.3.2.2. 创建索引

根据MySQL的表结构信息,在ES中创建对应的索引信息。

在ES中创建索引的方法有多种,推荐两种方法:

kinbana:

PUT /proof_data_history_202311

{

   "mappings":{

       "properties":{

         "id": {

           "type": "long"

        },

         "biz_id": {

           "type": "text"

        },

         "biz_type": {

           "type": "long"

        },

         "encrypt_mode": {

           "type": "long"

        },

         "sign_user": {

           "type": "text"

        },

         "tx_hash": {

           "type": "text"

        },

         "from_addr": {

           "type": "text"

        },

         "data": {

           "type": "text"

        },

         "channel_id": {

           "type": "text"

        },

         "block_height": {

           "type": "long"

        },

         "tx_time": {

           "type": "date"

}

    }

  }

}

命令行:

curl -XPUT "http://192.168.10.101:9200/proof_data_history_202311" -H 'Content-Type: application/json' -d'

{

   "mappings":{

       "properties":{

         "id": {

           "type": "long"

        },

         "biz_id": {

           "type": "text"

        },

         "biz_type": {

           "type": "long"

        },

         "encrypt_mode": {

           "type": "long"

        },

         "sign_user": {

           "type": "text"

        },

         "tx_hash": {

           "type": "text"

        },

         "from_addr": {

           "type": "text"

        },

         "data": {

           "type": "text"

        },

         "channel_id": {

           "type": "text"

        },

         "block_height": {

           "type": "long"

        },

         "tx_time": {

           "type": "date"

}

    }

  }

}'

4.3.2.3. 启动adapter

cd /data/canal/Adapter/bin

./startup.sh

4.3.3. 数据初始化

如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用此步骤。

canal-adapter提供一个REST接口可全量同步数据到ES:

# 全量同步proof_data_history_202311表数据到es的proof_data_history_202311索引中,路径中的es7是上一步的文件夹名

curl -X POST http://192.168.10.101:8081/etl/es7/proof_202311.yml

以120万行实验数据为例,分别进行ES中没有数据和已有数据的两种不同情况下的全量同步实验。

ES中没有数据,同步时间86秒:

2023年 11月 28日 星期二 09:17:55 CST开始执行全量同步

{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}

2023年 11月 28日 星期二 09:19:21 CST完成执行全量同步

ES中已有数据,同步时间87秒:

2023年 11月 28日 星期二 09:27:46 CST开始执行全量同步

{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}

2023年 11月 28日 星期二 09:29:03 CST完成执行全量同步

5. 补充资料

在实际生产中,server和adapter往往部署在不同的网络区域,这里就涉及到网络安全访问策略的问题,下面梳理出了需要的网络安全访问策略。

源端目标端Adapter端IP数据源端MySQL地址和端口Adapter端IP数据目的地ES地址和端口(9200、9300)Adapter端IPServer端地址和11110、11111、11112端口Server端IP数据源端MySQL地址和端口

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 


大家都在找:

Mysql:mysql主从复制

elasticsearch:elasticsearch怎么读

大家都在看: