一、Otter

语言:java

定位:基于数据库增量日志解析,准实时同步到本机房或异地机房的mysql/oracle数据库,一个分布式数据库同步系统。

1.工作原理:

 

2.原理描述

1. 基于Canal开源产品,获取数据库增量日志数据

2. 典型管理系统架构,manager web管理 + node工作节点

a. manager运行时推送同步配置到node节点

b. node节点将同步状态反馈到manager上

3. 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作。

二、Canal

1.定义

Canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

Canal结构及支持中间件示意图:

 

2.工作原理

canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,ElasticSearch等等。

三、功能业务实现

1. 异构库同步  mysql -> mysql/oracle

2. 单机房数据库同步<数据库之间RTT < 1ms>

a. 数据库版本升级

b. 数据表迁移

3. 异地机房数据库同步

4. 双向同步

四、服务调度模型及核心算法

1.背景

在异地机房数据传输时,对网络间传输带宽、延迟以及网络中断问题,TCP/IP协议早有处理方案

 

2.Nagle算法

   a. 构建RingBuffer (可以基于内存控制模式/数量控制模式)

   b. 允许客户端指定batchSize获取

   c. 指定定batchSize + timeout获取

建议值:batchSize=4000(约4M) , timeout=500,内存控制模式

3.滑动窗口

 

说明:

数据通过select模块串行获取canal的批数据,注意是串行获取,每批次获取到的数据,就会有一个全局标识,称之为processId。select模块获取到数据后,将其传递给后续的ETL模型。 这里E和T模块会是一个并行处理将数据最后传递到Load时,会根据每批数据对应的processId,按照顺序进行串行加载。 ( 比如有一个processId=2的数据先到了Load模块,但会阻塞等processId=1的数据Load完成后才会被执行)

简单一点说,Select/Load模块会是一个串行机制来保证binlog处理的顺序性,Extract/Transform会是一个并行,加速传输效率。

3.1 并行度

类似于tcp滑动窗口大小,比如整个滑动窗口设置了并行度为5时,只有等第一个processId Load完成后,第6个Select才会去获取数据。

3.2 数据可靠性

如何保证数据不丢:2pc (get/ack)如何处理重传协议:get/ack/rollback如何支持并行化:多get cursor+ack curosr

3.3 编程模型抽象(SEDA模型) 

 

说明: 将并行化调度的串行/并行处理,进行隐藏,抽象了await/single的接口,整个调度称之为仲裁器。(有了这层抽象,不同的仲裁器实现可以解决同机房,异地机房的同步需求)

3.4 模型接口

await模拟object获取锁操作notify被唤醒后提交任务到thread poolssingle模拟object释放锁操作,触发下一个stage

这里使用了SEDA模型的优势:

共享thread pool,解决流控机制划分多stage,提升资源利用率统一编程模型,支持同机房,跨机房不同的调度算法

3.5 数据传输

有了一层SEDA调度模型的抽象,S/E/T/L模块之间互不感知,那几个模块之间的数据传递,需要有一个机制来处理,这里抽象了一个pipe(管道)的概念。

原理:

stage | pipe | stage

基于pipe实现:

in memory (两个stage经过仲裁器调度算法选择为同一个node时,直接使用内存传输)rpc call (<1MB)file(gzip) + http多线程下载

在pipe中,通过对数据进行TTL控制,解决TCP协议中的丢包问题控制。

4.仲裁器算法

主要包括:令牌生成(processId) + 事件通知

令牌生成:

基于AtomicLong.inc()机制,(纯内存机制,解决同机房,单节点同步需求,不需要多节点交互)基于zookeeper的自增id机制,(解决异地机房,多节点协作同步需求)

事件通知: (简单原理: 每个stage都会有个block queue,接收上一个stage的single信号通知,当前stage会阻塞在该block queue上,直到有信号通知)

block queue + put/take方法,(纯内存机制)block queue + rpc + put/take方法 (两个stage对应的node不同,需要rpc调用,需要依赖负载均衡算法解决node节点的选择问题)block queue + zookeeper watcher ( )

5.负载均衡算法

Stick: 类似于session stick技术,一旦第一次选择了node,下一次选择会继续使用该node。 (有一个好处,资源上下文缓存命中率高)Random: 随机算法RoundRbin: 轮询算法

注意点:每个node节点,都会在zookeeper中生成Ephemeral节点,每个node都会缓存住当前存活的node列表,node节点消失,通过zookeeper watcher机制刷新每个node机器的内存。然后针对每次负载均衡选择时只针对当前存活的节点,保证调度的可靠性。

6.数据同步算法

1. insert + insert -> insert (数据迁移+数据增量场景)

2. insert + update -> insert  (update字段合并到insert)

3. insert + delete -> delete

4. update + insert -> insert (数据迁移+数据增量场景)

5. update + update -> update

6. update + delete -> delete

7. delete + insert -> insert

8. delete + update -> update (数据迁移+数据增量场景)

9. delete + delete -> delete

说明:

1. insert/行记录update 执行merge sql,解决重复数据执行

2. 合并算法执行后,单pk主键只有一条记录,减少并行load算法的复杂性(比如batch合并,并行/串行等处理)

7.数据入库算法

   入库算法采取了按pk hash并行载入+batch合并的优化

   a. 打散原始数据库事务,预处理数据,合并insert/update/delete数据(参见合并算法),然后按照table + pk进行并行(相同table的数据,先执行delete,后执行insert/update,串行保证,解决唯一性约束数据变更问题),相同table的sql会进行batch合并处理

   b. 提供table权重定义,根据权重定义不同支持业务上类事务功能,并行中同时有串行权重控制。

   业务类事务描述:比如用户的一次交易付款的流程,先产生一笔交易记录,然后修改订单状态为已付款。 用户对这事件的感知,是通过订单状态的已付款,然后进行查询交易记录。

   所以,可以对同步进行一次编排: 先同步完交易记录,再同步订单状态。 给同步表定义权重,权重越高的表相对重要,放在后面同步,最后达到的效果可以保证业务事务可见性的功能,快的等慢的。

8.双向回环控制算法

支持mysql/oracle的异构数据库的双向回环需要支持级联同步,比如A<->B->C,A同步到B的数据,不能从B回到A,但需要同步到C

实现思路:

利用事务机制,在事务头和尾中插入同步标识解析时识别同步标识,判断是否需要屏蔽同步

几点注意:

基于标准SQL实现可以支持mysql/oracle等异构数据库的双向同步事务完整解析&完整可见性事务被拆开同步,会出现部分回环同步,数据不一致。 比如一个事务被拆分为了3截,中间一截因为没有事务头和尾的标识,如果发生同步了,就会导致数据不一致。

五、映射规则

因为各系统间数据上的特殊业务,比如:

同步数据同时,需要同步数据关联的文件同步会员数据,敏感字段信息需要做数据过滤两地的数据库可能为异构数据库,需要做字段类型,名字等转化。

为解决这些业务,引入了映射规则这一概念,用于描述这样的一种同步业务的关系,其粒度可以精确到一张表,或者是一整个库。

表映射

每个pipeline可以设置多个映射规则,每个映射规则描述一个数据库同步的内容,比如源表是哪张,同步到哪张目标表。

权重的概念

因为采用了pk hash的并行载入算法,会将原先binlog中的事务进行打散做并行处理提升同步性能。原先事务被拆散后,通过这个权重概念,来提供业务上类事务功能

实例:

两张表,product和product_detail,detail上有product_id与product进行关联。1对1的关系业务上插入数据可以通过事务,先插入product,然后插入product_detail,最后一起提交到数据库,页面上通过查询用户的product表的数据,发现有产品记录,然后通过product_id去查询product_detail表,查到后才显示产品页面假如同步到目标库后,打散事务后,同步过程如果先插入了product表,后插入product_detail表,这时如果有用户访问该产品记录,可能就会遇到product_detail不存在的情况,从而导致页面出错。所以,我们通过权重定义,比如将product_detail权重定义为5,将product定义为10。 优先同步权重低的表数据,最终可以保证查询product有数据后,product_detail一定有数据,避免业务出错。

视图映射

映射规则配置页面,可以选择视图模式为:include或exclude,代表正向匹配或者逆向排除。视图配置页面,只支持存在的数据表(因为要获取数据表结构,所以.*等正则的数据表不支持配置该功能)视图配置列表,左右选中列表会按照顺序进行对应,做映射时需按照顺序进行选择。

如果要排除表字段A的同步,则只需要选择为exclude模式,然后视图编辑页面选择左右皆选择A字段即可,点击保存。

字段组映射

字段组同步需求:

文件同步:一条记录对应的图片,可能会有一个或者多个字段,比如会有image_path,image_version来决定图片,所以我们可以定义这两个字段为一组,只要满足组内任意一个字段的变更,就会认为需要文件同步。组同步,比如国家,省份,城市,可能在数据库为三个字段。 如果是双A同步,两地同时修改这些字段,但业务上可能在A地修改了国家为美国,在B地修改为省份为浙江,然后一同步,最终就会变成美国,浙江这样的情况。 这种情况可以通过group来解决,将国家,省份,城市做一个group,组内任何一个字段发生了变更,其余字段会做为整体一起变更。

六、多种同步方式配置

a. 单向同步

单向同步为最基本的同步方式,目前支持mysql -> mysql/oracle的同步。

基本配置方式就如操作视频中所演示的,操作步骤:

配置一个channel配置一个pipeline对应node机器选择时,建议遵循:S/E节点选择node需尽可能离源数据库近,T/L节点选择node则离目标数据库近。 如果无法提供双节点,则选择离目标数据库近的node节点相对合适。配置一个canal定义映射关系canal中配置解析的数据库ip需要和映射关系中源表对应的数据库ip一致 ps. 映射关系进行匹配的时候是基于表名,虽然数据库ip不匹配也会是有效。

b. 双向同步

双向同步可以理解为两个单向同步的组合,但需要额外处理避免回环同步,回环同步算法

同时,因为双向回环控制算法会依赖一些系统表,需要在需要做双向同步的数据库上初始化所需的系统表。

配置上相比于单向同步有一些不同,操作步骤:

配置一个channel配置两个pipeline 注意:两个单向的canal和映射配置,在一个channel下配置为两个pipeline。 如果是两个channel,每个channel一个pipeline,将不会使用双向回环控制算法,也就是会有重复回环同步。每个pipeline各自配置canal,定义映射关系

c. 双A同步

双A同步相比于双向同步,主要区别是双A机房会在两地修改同一条记录,而双向同步只是两地的数据做互相同步,两地修改的数据内容无交集。

所以双A同步需要额外处理数据同步一致性问题,目前开源版本主要是提供了单向回环补救的一致性方案。

双A同步相比于双向同步,整个配置主要是一些参数上有变化,具体步骤:

配置一个channel 配置两个pipeline

* 注意:除了需要定义一个主站点外,需要在高级设置中将一个pipeline的“支持DDL”设置为false,另一个设置为true,否则将提示“一个channel中只允许开启单向ddl同步!”错误

每个pipeline各自配置canal,定义映射关系

d. 级联同步

单向同步/双向同步,都是针对一个channel下的多pipeline配置进行控制,是否可以使用多个channel完成类似级联同步的功能

几种级联同步

A->B->C ,A单向同步到B,B再单向同步到CA<->B->C,A和B组成一个双向,B再单向同步到CA<->B<-C,A和B组成一个双向,C将数据单向同步B,也就是B是一个接受多M同步写入的的节点,目前mysql不支持A<->B->C,B-/->D,A和B组成一个双向,B再单向同步到C,但A同步到B的数据不同步到D,但B地写入的数据同步到D,目前mysql不支持。

对应操作步骤:

目前channel之间的级联同步,不需要设置任何参数,只要通过canal进行binlog解析即可。

针对级联屏蔽同步,需要利用到自定义同步标记的功能,比如A->B,B同步到C但不同步到D。需要在A->B的同步高级参数里定义NOT_DDD,然后在B同步到D的高级参数里也定义NOT_DDD。 原理:这样在B解析到A->B写入的同步标记为NOT_DDD,与当前同步定义的NOT_DDD进行匹配,就会忽略此同步。 

e. 多A同步

基于以上的单向/双向/双A/级联同步,可以随意搭建出多A同步,不过目前受限于同步数据的一致性算法,只能通过星形辐射,通过级联同步的方式保证全局多A机房的数据一致性。 比如图中B和C之前的一致性同步,需要通过主站点A来保证。

七、服务可用性及数据一致性实现

1.一致性

需要处理一致性的业务场景:

多地修改 (双A同步)同一记录,同时变更

同一记录定义:具体到某一张表,某一条pk,某一字段

同时变更定义:A地写入的数据在B地还未可见的一段时间范围

基本思路

事前控制:比如paoxs协议,在多地数据写入各自数据存储之前,就已经决定好最后保留哪条记录事后处理:指A/B两地修改的数据,已经保存到数据库之后,通过数据同步后保证两数据的一致性

1.单向回环补救

适用场景: A地和B地数据不对等,比如A地为主,写入量比较高,B地有少量的数据写入

单向回环流程:

A->B同步的数据,会再次进入B->A队列,形成一次单向回环B->A同步的数据,不会进入A->B队列(回环终止,保证不进入死循环)

存在的问题:存在同步延迟时,会出现版本丢失高/数据交替性变化

比如A同一条记录变更了10个版本,而且很快同步到了B,而B因为同步数据大,同步延迟,后续单向回环中将10个版本又在A进行了一次重放,导致出现数据交替比如B同一条记录变更了10个版本,而且很快同步到了A,而A因为同步延迟,将一个比较早的版本同步到了B,后续通过单向回环,将此记录重放到了A,导致之前B到A的10个版本丢失。

解决方案:

反查数据库同步 (以数据库最新版本同步,解决交替性,比如设置一致性反查数据库延迟阀值为60秒,即当同步过程中发现数据延迟超过了60秒,就会基于PK反查一次数据库,拿到当前最新值进行同步,减少交替性的问题)字段同步 (降低冲突概率)同步效率 (同步越快越好,降低双写导致版本丢失概率,不需要构建冲突数据KV表)同步全局控制 (比如B->A和A->B一定要一起启动,一起关闭,保证不会出现一边数据一直覆盖另一边,造成比较多的版本丢失)

同步全局控制方案:(分布式Permit)

注意:A,B,C三点状态都正常才允许进行同步(解决数据单向覆盖)。 任何一边的canal不正常工作,都应该停掉整个双向同步,及时性越高越好。

2.时间交集补救

 

算法描述:

1. 首先定义两个时间概念

数据变更时间A :代表业务数据在A地数据库中产生的时间,即图中的时间A数据同步时间B:代表数据变更载入到B地数据库的时间,即图中的时间B

2. 针对每条或者一批数据都记录变更时间A和同步时间B,同时保留历史同步过的数据记录

3. 图中纵轴为时间轴,Aa代表从数据库A同步到数据库B的一个同步过程,Ba代表从数据库B到同步到的数据库A的一个同步过程,每个同步过程在纵轴上会有两个点,分别代表变更时间A和同步时间B。

4. 根据同一时间的定义,在两边数据库的各自同步过程中,以数据库A为例,在数据库B的同步过程找到与Aa有时间交集的批次,比如这里就是Aa 与 (Ba , Bb , Bc)有时间交集

5. 针对步骤4中的批次,根据同一数据的定义,在交集的每个批次中,比如首先拿Aa和Ba的历史同步数据记录,根据同一数据定义进行查找,然后再是Aa和Bb,依次类推。

6. 针对步骤5中找到的同一数据,最后确定为需要进行单向回环的一致性算法的数据。

此方案相比于单向回环方案:减少单向回环同步的数据量,解决A和B地数据对等的case,不过目前开源版本暂未实现。

2.可用性

1. node和manager独立部署

manager对于node来说可以是一个optional的环境,只有在第一次启动任务时需要,node一旦启动了同步任务后,无论manager是否可用,不能影响正常同步。

需要考虑的点:

node对于配置需要有本地cachenode推送统计信息到manager需要有容错处理,需要考虑manager failover(一台manager挂了,需要链接到另一台)

2. 异常流程处理机制

仲裁器设计了三种异常机制指令:

WARNING : 只发送报警信息,不做任何S/E/T/L调度干预ROLLBACK : 尝试获取分布式锁,避免并发修改,其次修改分布式Permit为false,停止后续的所有S/E/T/L调度,然后删除所有当前process调度信息,通过zookeeper watcher通知所有相关node,清理对应process的上下文,pipe的数据存储会通过TTL来进行清理,不需要ROLLBACK干预。完成后,释放锁操作RESTART:前面几个步骤和ROLLBACK基本类似,唯一不同点在于,在释放锁之前会尝试修改分布式Permit为true,重新开启同步,然后释放锁。

3. node节点监控原理

每个node在启动完成后,都会在zookeeper中创建一个Ephemerals节点(此节点特点,当node节点发生crash之后,与zookeeper建立的sesstion因为没有心跳,超过一定时间后就会出现SesstionExpired,然后zookeeper会删除该节点)manager监听整个node节点列表的变化,任何一个node节点的消失,都会收到zookeeper watcher通知,与内存中上一个版本进行比较,判断出当前消失的node节点针对该消失的node节点,会有一段保护期(因为可能正常的发布,会关闭node,同样会触发该watcher),如果该node在保护期内重新启动了,则不做任何处理。默认保护期为90秒如果保护期内node节点未正常启动,说明node是异常crash,通过查询配置,找到使用了该node的所有同步任务,对每个同步任务发起一个RESTART指令,让所有同步任务重新做一次负载均衡选择,避免挂在老的node上,一直等其结果返回。

4. 数据库切换

比如会有一套管理系统,配置当前mysql主备的关系,发现主机不可用时,他们会通过该系统切备机变为主机,然后推送该配置到所有节点,然后各个客户端收到主备切换消息,更改自己的数据库链接,完成数据库切换。

因为内部系统无法直接开源,在服务中自带了一个简单版的数据库主备推送的机制,通过页面上的切换按钮,就可以通知到所有节点,切换数据库链接,包括canal的binlog解析和数据库loader等。

同步服务内部配置中称之为主备配置(media配置),为一对主备IP,定义一个groupKey。然后在各个地方使用该groupKey。

canal中可以选择HA机制为media,然后填入对应的groupKey即可

发现需要做数据库切换了,可以直接点击切换按钮,目前实现定时轮询非推送,一般需要1分钟左右才会正式生效,或者发生一次RESTART指令。同样,主备切换可以暴露为服务,方便大家接入各自的数据库管理平台。

八、S/E/T/L stage阶段模型

说明:为了更好的支持系统的扩展性和灵活性,将整个同步流程抽象为Select/Extract/Transform/Load,这么4个阶段。

Select阶段: 为解决数据来源的差异性,比如接入canal获取增量数据,也可以接入其他系统获取其他数据等。

Extract/Transform/Load 阶段:类似于数据仓库的ETL模型,具体可为数据join,数据转化,数据Load的

 

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: