使用背景

项目后台使用mongoDb,门户页使用elasticsearch,需求是将mongoDb的数据同步到elasticsearch中。于是联想到mysql有基础binlog开发的canal。mg是否有类似binlog这种机制实现的实时监控同步工具。

方案

MongoShake(项目地址:https://github.com/alibaba/MongoShake)ChangeStream (monggodb的3.6版本之后推出的)

MongoShake

  MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志, 对MongoDB的数据进行复制,后续通过操作日志实现特定需求。   MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。 阿里官方文档: https://developer.aliyun.com/article/763827?spm=a2c6h.13813017.content3.1.3eb032a4yx0ZoM#slide-11 项目地址: https://github.com/alibaba/MongoShake

ChangeStream

changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更 想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具

关于changestream做如下说明,提供参考

在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群

changestream可用于监听的mongodb目标类型

单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持

一个Change Stream Event的基本结构如下所示:

{

_id : { },

"operationType" : "",

"fullDocument" : { },

"ns" : {

"db" : "",

"coll" : "

},

"documentKey" : { "_id" : },

"updateDescription" : {

"updatedFields" : { },

"removedFields" : [ "", ... ]

}

"clusterTime" : ,

"txnNumber" : ,

"lsid" : {

"id" : ,

"uid" :

}

}

关于上面的数据结构,做简单的解释说明,

_id,变更事件的Token对象operationType,变更类型(见下面介绍)fullDocument,文档内容ns,监听的目标ns.db,变更的数据库ns.coll,变更的集合documentKey,变更文档的键值,含_id字段updateDescription,变更描述updateDescription.updatedFields,变更中更新字段updateDescription.removedFields,变更中删除字段clusterTime,对应oplog的时间戳txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持

Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:

insert,插入文档delete,删除文档replace,替换文档,当执行replace操作指定upsert时,可能是insert事件update,更新文档,当执行update操作指定upsert时,可能是insert事件invalidate,失效事件,比如执行了collection.drop或collection.rename 以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等

Java客户端操作changestream

1、引入maven依赖

org.springframework.boot

spring-boot-starter-data-mongodb

org.springframework.boot

spring-boot-starter-data-mongodb-reactive

2、注册MessageListenerContainer

package com.iflytek.databus.conf;

import com.iflytek.databus.entity.OriginalOplogCkpt;

import com.iflytek.databus.init.DocumnetMessageListener;

import com.mongodb.client.model.changestream.FullDocument;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import org.bson.BsonDocument;

import org.bson.Document;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.mongodb.core.MongoTemplate;

import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;

import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;

import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;

import java.util.concurrent.Executor;

import static org.springframework.data.mongodb.core.aggregation.Aggregation.match;

import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;

import static org.springframework.data.mongodb.core.query.Criteria.where;

/**

* @author xyding6

* @Classname MongoConfig

* @Description TODO

* @date 2023/2/21

*/

@Configuration

@Slf4j

public class MongoConfig {

@Value("${spring.data.mongodb.database}")

String mongoDatabase;

@Bean

MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener,Executor asyncExecutor) {

// Executor executor = Executors.newSingleThreadExecutor();

MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, asyncExecutor) {

@Override

public boolean isAutoStartup() {

return true;

}

};

//oplog_check_point

OriginalOplogCkpt originalOplogCkpt = template.findById("oplog_check_point", OriginalOplogCkpt.class, "original_oplog_ckpt");

ChangeStreamRequest.ChangeStreamRequestBuilder documentChangeStreamRequestBuilder = ChangeStreamRequest.builder(documnetMessageListener)

.database(mongoDatabase)

// .collection("original_aydoc_test") //需要监听的集合名,不指定默认监听数据库的

.filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) //过滤需要监听的操作类型,可以根据需求指定过滤条件

.fullDocumentLookup(FullDocument.UPDATE_LOOKUP);//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息

if(originalOplogCkpt != null && StringUtils.isNotBlank(originalOplogCkpt.getResumeToken())){

log.info("==mongo监听从位点处恢复执行,位点resumeToken:{}==",originalOplogCkpt.getResumeToken());

documentChangeStreamRequestBuilder.resumeToken(BsonDocument.parse(originalOplogCkpt.getResumeToken()));

}

ChangeStreamRequest request = documentChangeStreamRequestBuilder.build();

//TODO 测试暂不开启

messageListenerContainer.register(request, Document.class);

return messageListenerContainer;

}

}

这一步是注册自己编写的监听器,oplog_check_point这是本地mg的一个集合,用来记录每次位点的信息。

3. 编写MessageListener

package com.iflytek.databus.init;

import com.google.common.collect.Lists;

import com.iflytek.databus.entity.OriginalOplogCkpt;

import com.iflytek.databus.service.TransToEsService;

import com.iflytek.databus.utils.EntityUtils;

import com.iflytek.databus.utils.GsonUtils;

import com.mongodb.client.model.changestream.ChangeStreamDocument;

import com.mongodb.client.model.changestream.OperationType;

import lombok.extern.slf4j.Slf4j;

import org.bson.Document;

import org.elasticsearch.client.RestHighLevelClient;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.mongodb.core.MongoTemplate;

import org.springframework.data.mongodb.core.messaging.Message;

import org.springframework.data.mongodb.core.messaging.MessageListener;

import org.springframework.stereotype.Component;

import java.util.Date;

/**

* @author xyding6

* @Classname DocumnetMessageListener

* @Description TODO

* @date 2023/2/21

*/

@Component

@Slf4j

public class DocumnetMessageListener implements MessageListener, Document> {

@Autowired

RestHighLevelClient client;

@Autowired

MongoTemplate mongoTemplate;

@Autowired

TransToEsService transToEsService;

@Override

public void onMessage(Message, Document> message) {

try {

//数据库

String databaseName = message.getProperties().getDatabaseName();

//集合名称

String collectionName = message.getProperties().getCollectionName();

//操作类型

String operationType = message.getRaw().getOperationType().getValue();

//位点token

String resumeToken = message.getRaw().getResumeToken().toJson();

//消息

Document fullDocument = message.getRaw().getFullDocument();

if(Lists.newArrayList("original_oplog_ckpt","original_mq_log","original_sys_login_log").contains(collectionName)){

return;

}

log.info("Received Message in collection: {},message raw: {}, message body:{}",

message.getProperties().getCollectionName(), message.getRaw(), message.getBody());

//消息主键

String messageId = message.getRaw().getDocumentKey().getObjectId("_id").getValue().toHexString();

Class clazz = Class.forName("com.iflytek.databus.entity."+ EntityUtils.toHump(collectionName.replace("original_","")));

//1. 处理消息 TODO 考虑放入消息中间件消费

if(Lists.newArrayList(OperationType.INSERT.getValue(),OperationType.UPDATE.getValue(),OperationType.REPLACE.getValue()).contains(operationType)){

Object claszzObj = GsonUtils.fromJsonToBean(fullDocument, clazz);

transToEsService.mongoToEs(collectionName, Lists.newArrayList(claszzObj),false);

}

if(OperationType.DELETE.getValue().equals(operationType)){

transToEsService.oplogDelTrigger(collectionName,messageId,clazz);

}

//2. 每次处理消息成功,更新resumeToken,以便于故障重启后能继续同步消息

OriginalOplogCkpt originalOplogCkpt = new OriginalOplogCkpt();

originalOplogCkpt.setId("oplog_check_point");

originalOplogCkpt.setMessageId(messageId);

originalOplogCkpt.setDatabaseName(databaseName);

originalOplogCkpt.setCollectionName(collectionName);

originalOplogCkpt.setOperationType(operationType);

originalOplogCkpt.setResumeToken(resumeToken);

originalOplogCkpt.setCreateTime(new Date());

mongoTemplate.save(originalOplogCkpt);

} catch (Exception e) {

e.printStackTrace();

log.error("Mongo消息实时监控处理失败,错误信息:{}",e.getStackTrace());

}

}

}

消息监听器中可以去实现自己对消息处理的逻辑,这里我的需求是将mg的变化同步到es中。

4. 动态的启停changestream

package com.iflytek.databus.service;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;

import org.springframework.stereotype.Service;

/**

* @author xyding6

* @Classname ChangeStreamService

* @Description TODO

* @date 2023/2/24

*/

@Service

@Slf4j

public class ChangeStreamService {

@Autowired

private MessageListenerContainer messageListenerContainer;

public void start(){

log.info("ChangeStreamService start");

messageListenerContainer.start();

}

public void stop(){

log.info("ChangeStreamService stop MessageListenerContainer");

messageListenerContainer.stop();

}

}

可以在容器运行过程中,去操作changestream的启停。自己可以编写Controller来进行调用。

到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,如果项目对性能和延迟性要求不是太高可以选中changestream进行实现,否则还是推荐使用阿里开源的MongoShake.

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: