使用背景
项目后台使用mongoDb,门户页使用elasticsearch,需求是将mongoDb的数据同步到elasticsearch中。于是联想到mysql有基础binlog开发的canal。mg是否有类似binlog这种机制实现的实时监控同步工具。
方案
MongoShake(项目地址:https://github.com/alibaba/MongoShake)ChangeStream (monggodb的3.6版本之后推出的)
MongoShake
MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志, 对MongoDB的数据进行复制,后续通过操作日志实现特定需求。 MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。 阿里官方文档: https://developer.aliyun.com/article/763827?spm=a2c6h.13813017.content3.1.3eb032a4yx0ZoM#slide-11 项目地址: https://github.com/alibaba/MongoShake
ChangeStream
changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更 想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具
关于changestream做如下说明,提供参考
在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群
changestream可用于监听的mongodb目标类型
单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持
一个Change Stream Event的基本结构如下所示:
{
_id : {
"operationType" : "
"fullDocument" : {
"ns" : {
"db" : "
"coll" : " }, "documentKey" : { "_id" : "updateDescription" : { "updatedFields" : { "removedFields" : [ " } "clusterTime" : "txnNumber" : "lsid" : { "id" : "uid" : } } 关于上面的数据结构,做简单的解释说明, _id,变更事件的Token对象operationType,变更类型(见下面介绍)fullDocument,文档内容ns,监听的目标ns.db,变更的数据库ns.coll,变更的集合documentKey,变更文档的键值,含_id字段updateDescription,变更描述updateDescription.updatedFields,变更中更新字段updateDescription.removedFields,变更中删除字段clusterTime,对应oplog的时间戳txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持 Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个: insert,插入文档delete,删除文档replace,替换文档,当执行replace操作指定upsert时,可能是insert事件update,更新文档,当执行update操作指定upsert时,可能是insert事件invalidate,失效事件,比如执行了collection.drop或collection.rename 以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等 Java客户端操作changestream 1、引入maven依赖 2、注册MessageListenerContainer package com.iflytek.databus.conf; import com.iflytek.databus.entity.OriginalOplogCkpt; import com.iflytek.databus.init.DocumnetMessageListener; import com.mongodb.client.model.changestream.FullDocument; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.bson.BsonDocument; import org.bson.Document; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import java.util.concurrent.Executor; import static org.springframework.data.mongodb.core.aggregation.Aggregation.match; import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation; import static org.springframework.data.mongodb.core.query.Criteria.where; /** * @author xyding6 * @Classname MongoConfig * @Description TODO * @date 2023/2/21 */ @Configuration @Slf4j public class MongoConfig { @Value("${spring.data.mongodb.database}") String mongoDatabase; @Bean MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener,Executor asyncExecutor) { // Executor executor = Executors.newSingleThreadExecutor(); MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, asyncExecutor) { @Override public boolean isAutoStartup() { return true; } }; //oplog_check_point OriginalOplogCkpt originalOplogCkpt = template.findById("oplog_check_point", OriginalOplogCkpt.class, "original_oplog_ckpt"); ChangeStreamRequest.ChangeStreamRequestBuilder .database(mongoDatabase) // .collection("original_aydoc_test") //需要监听的集合名,不指定默认监听数据库的 .filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) //过滤需要监听的操作类型,可以根据需求指定过滤条件 .fullDocumentLookup(FullDocument.UPDATE_LOOKUP);//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息 if(originalOplogCkpt != null && StringUtils.isNotBlank(originalOplogCkpt.getResumeToken())){ log.info("==mongo监听从位点处恢复执行,位点resumeToken:{}==",originalOplogCkpt.getResumeToken()); documentChangeStreamRequestBuilder.resumeToken(BsonDocument.parse(originalOplogCkpt.getResumeToken())); } ChangeStreamRequest request = documentChangeStreamRequestBuilder.build(); //TODO 测试暂不开启 messageListenerContainer.register(request, Document.class); return messageListenerContainer; } } 这一步是注册自己编写的监听器,oplog_check_point这是本地mg的一个集合,用来记录每次位点的信息。 3. 编写MessageListener package com.iflytek.databus.init; import com.google.common.collect.Lists; import com.iflytek.databus.entity.OriginalOplogCkpt; import com.iflytek.databus.service.TransToEsService; import com.iflytek.databus.utils.EntityUtils; import com.iflytek.databus.utils.GsonUtils; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.OperationType; import lombok.extern.slf4j.Slf4j; import org.bson.Document; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.stereotype.Component; import java.util.Date; /** * @author xyding6 * @Classname DocumnetMessageListener * @Description TODO * @date 2023/2/21 */ @Component @Slf4j public class DocumnetMessageListener implements MessageListener @Autowired RestHighLevelClient client; @Autowired MongoTemplate mongoTemplate; @Autowired TransToEsService transToEsService; @Override public void onMessage(Message try { //数据库 String databaseName = message.getProperties().getDatabaseName(); //集合名称 String collectionName = message.getProperties().getCollectionName(); //操作类型 String operationType = message.getRaw().getOperationType().getValue(); //位点token String resumeToken = message.getRaw().getResumeToken().toJson(); //消息 Document fullDocument = message.getRaw().getFullDocument(); if(Lists.newArrayList("original_oplog_ckpt","original_mq_log","original_sys_login_log").contains(collectionName)){ return; } log.info("Received Message in collection: {},message raw: {}, message body:{}", message.getProperties().getCollectionName(), message.getRaw(), message.getBody()); //消息主键 String messageId = message.getRaw().getDocumentKey().getObjectId("_id").getValue().toHexString(); Class clazz = Class.forName("com.iflytek.databus.entity."+ EntityUtils.toHump(collectionName.replace("original_",""))); //1. 处理消息 TODO 考虑放入消息中间件消费 if(Lists.newArrayList(OperationType.INSERT.getValue(),OperationType.UPDATE.getValue(),OperationType.REPLACE.getValue()).contains(operationType)){ Object claszzObj = GsonUtils.fromJsonToBean(fullDocument, clazz); transToEsService.mongoToEs(collectionName, Lists.newArrayList(claszzObj),false); } if(OperationType.DELETE.getValue().equals(operationType)){ transToEsService.oplogDelTrigger(collectionName,messageId,clazz); } //2. 每次处理消息成功,更新resumeToken,以便于故障重启后能继续同步消息 OriginalOplogCkpt originalOplogCkpt = new OriginalOplogCkpt(); originalOplogCkpt.setId("oplog_check_point"); originalOplogCkpt.setMessageId(messageId); originalOplogCkpt.setDatabaseName(databaseName); originalOplogCkpt.setCollectionName(collectionName); originalOplogCkpt.setOperationType(operationType); originalOplogCkpt.setResumeToken(resumeToken); originalOplogCkpt.setCreateTime(new Date()); mongoTemplate.save(originalOplogCkpt); } catch (Exception e) { e.printStackTrace(); log.error("Mongo消息实时监控处理失败,错误信息:{}",e.getStackTrace()); } } } 消息监听器中可以去实现自己对消息处理的逻辑,这里我的需求是将mg的变化同步到es中。 4. 动态的启停changestream package com.iflytek.databus.service; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import org.springframework.stereotype.Service; /** * @author xyding6 * @Classname ChangeStreamService * @Description TODO * @date 2023/2/24 */ @Service @Slf4j public class ChangeStreamService { @Autowired private MessageListenerContainer messageListenerContainer; public void start(){ log.info("ChangeStreamService start"); messageListenerContainer.start(); } public void stop(){ log.info("ChangeStreamService stop MessageListenerContainer"); messageListenerContainer.stop(); } } 可以在容器运行过程中,去操作changestream的启停。自己可以编写Controller来进行调用。 到此这篇关于springboot整合mongodb changestream的示例代码的文章就介绍到这了,如果项目对性能和延迟性要求不是太高可以选中changestream进行实现,否则还是推荐使用阿里开源的MongoShake. 相关阅读
发表评论