前言

       今天要跟大家分享的是监控数据变化,实现自己的业务的另一个思路,基于数据库的binglog。我这里是用的Binlog4j实现,希望看总结的,直接看最后。

一、Binlog4j是什么?

       Binlog4j是轻量级 Mysql Binlog 客户端, 提供宕机续读, 高可用集群等特性等等。具体看一看它的官网,目前已经出到1.9.0版本,项目加入了Dromara 社区。

二、使用步骤

先说下我这里用上这个的原因:

多个数据增删改触发重新计算不想在每个业务操作方法后调用重新计算不想aop切面处理,实现重新计算不想用springboot的事件、监听实现重新计算骨子里想创新,接受新事物、新思路 所以我选择了它,下面我简单介绍下使用。

官网其实有demo,我这里是与springboot集成。

1.引入库

com.gitee.Jmysy

binlog4j-spring-boot-starter

1.9.0

       mysql的依赖、redis的依赖自行补充。

2.配置文件

spring:

binlog4j:

database: 要监听的数据库(一个实例上有多个库)

redis-config: #redis配置

host: ip

port: 端口

password: 密码

client-configs:

master:

username: 数据库用户

password: 密码

host: 数据库ip

port: 端口

serverId: 1990

配置说明

timeOffset 时间偏移量, 单位:毫秒serverId 编号redisConfig Redis 配置信息, 详见 RedisConfiginaugural 首次启动, 如果为 true 在启动时不再读取 Redis 记录persistence 是否启用持久化, 默认为 falsestrict 严格模式, 默认为 truemode 模式, 详见: BinlogClientModeusername 数据库账户password 数据库密码host 数据库所在服务器 IP 地址port 数据库占用端口, 默认 3306hikariConfig 数据库连接池配置

3.统一监听处理

MyBinlogEventHandler

import cn.hutool.core.bean.BeanUtil;

import cn.hutool.core.bean.copier.CopyOptions;

import cn.hutool.core.date.StopWatch;

import cn.hutool.json.JSONUtil;

import com.gitee.Jmysy.binlog4j.core.BinlogEvent;

import com.gitee.Jmysy.binlog4j.core.IBinlogEventHandler;

import com.gitee.Jmysy.binlog4j.springboot.starter.annotation.BinlogSubscriber;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Value;

import site.morn.rest.RestBuilders;

import site.morn.rest.RestMessage;

import javax.annotation.Resource;

/**

* binlog事件处理器

* 连接数据的用户需要有binlog读权限

*

* @author zwmac

*/

@Slf4j

@BinlogSubscriber(clientName = "master")

public class MyBinlogEventHandler implements IBinlogEventHandler {

@Value("${spring.binlog4j.database:linkappdb}")

public String monitorDatabase;

@Resource

private ProgressWarnService progressWarnService;

@Override

public void onInsert(BinlogEvent binlogEvent) {

//log.info("数据库:" + binlogEvent.getDatabase());

//log.info("数据表:" + binlogEvent.getTable());

//log.info("插入数据:" + binlogEvent.getData());

//需要重新计算场景

//1、插入设置 app_progress_warn_config

//2、新增实际进度详情 app_progress_real_detail

//3、新增进度计划任务(子节点)app_progress_info

RestMessage restMessage = RestBuilders.successMessage();

CalProgressWarnVo calVo = new CalProgressWarnVo();

calVo.setType(1);

String tableName = binlogEvent.getTable();

if ("app_progress_warn_config".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(1);

ProgressWarnConfig progressWarnConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());

calVo.setNewData(progressWarnConfig);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("新增[进度预警配置]数据触发binlog事件执行结果:{}-耗时:{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

if ("app_progress_real_detail".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(2);

ProgressRealDetail realDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());

calVo.setNewData(realDetail);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("新增[实际进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

if ("app_progress_info".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

Object progressInfoObj = binlogEvent.getData();

ProgressInfo progressInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());

if (progressInfo.getType() == 2) {

//计划任务才需要重新计算

calVo.setDataType(3);

calVo.setNewData(progressInfo);

restMessage = progressWarnService.calProgressWarn(calVo);

}

sw.stop();

log.info("新增[进度任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

}

@Override

public void onUpdate(BinlogEvent binlogEvent) {

//log.info("数据库:" + binlogEvent.getDatabase());

//log.info("数据表:" + binlogEvent.getTable());

//log.info("原数据:" + binlogEvent.getOriginalData());

//log.info("新数据:" + binlogEvent.getData());

//需要重新计算场景

//1、设置表变更 app_progress_warn_config

//2、进度详情记录变更 app_progress_real_detail

//3、进度计划任务变更(计划开始时间、计划结束时间、工期)app_progress_info

RestMessage restMessage = null;

CalProgressWarnVo calVo = new CalProgressWarnVo();

calVo.setType(2);

String tableName = binlogEvent.getTable();

if ("app_progress_warn_config".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(1);

ProgressWarnConfig oldConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());

ProgressWarnConfig newConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());

calVo.setNewData(newConfig);

calVo.setOldData(oldConfig);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("修改[进度预警配置]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

if ("app_progress_real_detail".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(2);

ProgressRealDetail oldDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());

ProgressRealDetail newDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());

calVo.setNewData(newDetail);

calVo.setOldData(oldDetail);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("修改[进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

if ("app_progress_info".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(3);

ProgressInfo oldInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());

ProgressInfo newInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());

calVo.setNewData(newInfo);

calVo.setOldData(oldInfo);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("修改[进度计划任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

}

@Override

public void onDelete(BinlogEvent binlogEvent) {

//log.info("数据库:" + binlogEvent.getDatabase());

//log.info("数据表:" + binlogEvent.getTable());

//log.info("删除数据:" + binlogEvent.getData());

//需要重新计算场景

//1、删除进度详情记录 app_progress_real_detail

//2、删除进度任务(子节点)app_progress_info

RestMessage restMessage = null;

CalProgressWarnVo calVo = new CalProgressWarnVo();

calVo.setType(3);

String tableName = binlogEvent.getTable();

if ("app_progress_real_detail".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(2);

ProgressRealDetail oldDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());

calVo.setOldData(oldDetail);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("删除[进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

if ("app_progress_info".equals(tableName)) {

StopWatch sw = new StopWatch();

sw.start();

calVo.setDataType(3);

ProgressInfo oldInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());

calVo.setOldData(oldInfo);

restMessage = progressWarnService.calProgressWarn(calVo);

sw.stop();

log.info("删除[进度计划任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());

}

}

@Override

public boolean isHandle(String database, String table) {

//log.info("database:{},table:{}", database, table);

//只监控aep数据库

if (monitorDatabase.equals(database)) {

return true;

}

return false;

}

}

CalProgressWarnVo

import lombok.Data;

/**

* 计算进度预警参数Vo

*

* @author zwmac

*/

@Data

public class CalProgressWarnVo {

/**

* 类型:1新增,2变更,3删除

*/

private Integer type;

/**

* 数据类型:1进度预警配置,2进度详情,3进度计划任务

*/

private Integer dataType;

/**

* 旧数据

*/

private Object oldData;

/**

* 新数据

*/

private Object newData;

}

       从入参可以看出3类数据增删改都触发重新计算,这里的oldData、newData可以直接用于修改的时候传参,不用在查一次数据库。

progressWarnService.calProgressWarn(calVo);就是重新计算的具体实现了,这就涉及到业务了,各位自己实现。

总结

配置的账号要有binlog的读权限项目在跑,直接用其他工具操作数据库,也可以触发(这就是监听binlog的美妙)其他项目操作本项目的表,也可以监听到(原理同上)统一入口,不用有aop、事件、业务调用那么多入口需要考虑        好了,就写到这里,希望可以帮到大家,拥抱新事物,uping!

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: