大家好我是苏麟,今天聊聊数据同步 .

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

MySQL =>ES(单向)

同步方式

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本 4 种方式 , 全量同步(首次)+增量同步(新数据)

1.定时任务 (推荐 : 简单)

定时任务 : 比如1分钟1次,找到 MySQL 中过去几分钟内(至少是定时周期的2 倍)发生改变的数据,然后更新到 ES.

优点:简单易懂、占用资源少、不用引入第三方中间件缺点:有时间差应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改

2.双写 

双写 : 写数据的时候,必须也去写 ES;更新删除数据库同理。(事务:建议先保证 MySQL写成功,如果ES 写失败了,可以通过定时任务 + 日志 +告警进行检测和修复(补偿))

优点 : 不知道缺点 : 繁琐

3. Logstash

用 Logstash 数据同步管道 (一般要配合 kafka 消息队列 + beats 采集器)

优点 : 用起来方便,插件多缺点 : 成本更大 : 一般要配合其他组件使用 (比如 kafka) , 维护成本 : 多维护一个组件 , 学习成本 : 学习使用

4.Canal (推荐 : 简单 , 实时性非常强)

Canal 监听 MySQL Binlog,实时同步

优点 : 实时同步 , 实时性非常强缺点 :  忽略不计   (MySQL8版本可能连接失败)

定时任务

找到 MySQL 中过去几分钟内发生改变的数据,然后更新到 ES.

双向写入

写数据的时候,也去写 ES .

这个不推荐!

Logstash

传输 和 处理 数据的管道

文章 : Getting Started with Logstash | Logstash Reference [7.17] | Elastic

下载 : https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-windows-x86_64.zip

快速开始 : Running Logstash on Windows | Logstash Reference [7.17] | Elastic 

这里需要学习成本 , 有兴趣的小伙伴自己了解 , 这里不过多赘述 .

订阅数据库流水的同步方式 Canal

地址 : GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

原理 : 数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理canal : 帮你监听 binlog,并解析 binlog 为你可以理解的内容。 

它伪装成了 MySQL 的从节点,获取主节点给的 binlog,如图:

快速开始 : QuickStart · alibaba/canal Wiki · GitHub

windows 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.ini 文件:

Linux 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.cnf 文件:

[mysqld]

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MSQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

这里有个报错 java 找不到,修改 startup.bat 脚本为你自己的 java home

set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302 (自己MySQL路径)

set PATH=%JAVA_HOME%\bin;%PATH%

Java 中引入依赖

com.alibaba.otter

canal.client

1.1.0

Demo  

import java.net.InetSocketAddress;

import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.common.utils.AddressUtils;

import com.alibaba.otter.canal.protocol.Message;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class SimpleCanalClientExample {

public static void main(String args[]) {

// 创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),

11111), "example", "", "");

int batchSize = 1000;

int emptyCount = 0;

try {

connector.connect();

connector.subscribe(".*\\..*");

connector.rollback();

int totalEmptyCount = 120;

while (emptyCount < totalEmptyCount) {

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

emptyCount++;

System.out.println("empty count : " + emptyCount);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

} else {

emptyCount = 0;

// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);

printEntry(message.getEntries());

}

connector.ack(batchId); // 提交确认

// connector.rollback(batchId); // 处理失败, 回滚数据

}

System.out.println("empty too many times, exit");

} finally {

connector.disconnect();

}

}

private static void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

e);

}

EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (RowData rowData : rowChage.getRowDatasList()) {

if (eventType == EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(List columns) {

for (Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());

}

}

}

在启动之后 修改MySQL中数据 , java控制台就能实时输出 .

这期就到这里 , 下期见 !

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: