文章目录

一、同步实现思路1、方案一:同步调用2、方案二:异步通知3、方案三:监听binlog

二、实现ES与MySQL数据同步1、导入hotel-admin工程2、项目分析3、SpringAMQP整合4、声明队列和交换机5、发送消息MQ6、监听MQ消息7、测试同步功能

一、同步实现思路

elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。

1、方案一:同步调用

操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用

同步调用方式下,业务耦合太多。

2、方案二:异步通知

引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。

3、方案三:监听binlog

使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合。 其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。

三种实现方式的对比:

二、实现ES与MySQL数据同步

1、导入hotel-admin工程

启动服务,访问localhost:{spring.service.port}

在hotel-admin服务中,模拟MySQL数据的增删改查。

2、项目分析

mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;

声明exchange、queue、RoutingKey在hotel-admin中的增、删、改业务中完成消息发送在hotel-demo中完成消息监听,并更新elasticsearch中数据

模型如下:

3、SpringAMQP整合

引入依赖

org.springframework.boot

spring-boot-starter-amqp

临时启动个rabbitmq

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# 访问host:15672,用户和密码为默认的guest

在hotel-admin中的application.yml,添加mq连接信息

spring:

rabbitmq:

host: 192.168.150.101 # 主机名

port: 5672 # 端口

virtual-host: / # 虚拟主机

username: guest # 用户名

password: guest # 密码

最后,记得给消费方也引入AMQP依赖,并添加上mq的连接信息

4、声明队列和交换机

在常量目录下定义队列和交换机的名字

package cn.llg.hotel.constants;

public class HotelMqConstants {

//交换机名称

public static final String EXCHANGE_NAME = "hotel.topic";

//新增和修改队列

public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";

//删除队列

public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";

//RoutingKey

public static final String INSERT_KEY = "hotel.insert";

public static final String DELETE_KEY = "hotel.delete";

}

接下来声明队列和交换机,可以基于注解,也可以基于Bean,后者复杂些,这里演示后者

package cn.llg.hotel.config;

import cn.llg.hotel.constants.HotelMqConstants;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @date 2023/7/12

*/

@Configuration

public class MqConfig {

@Bean

public TopicExchange topicExchange(){

return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);

}

@Bean

public Queue insertQueue(){

return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);

}

@Bean

public Queue deleteQueue(){

return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);

}

/**

* 绑定队列和交换机关系

*/

@Bean

public Binding insertQueueBinding(){

return BindingBuilder

.bind(insertQueue())

.to(topicExchange())

.with(HotelMqConstants.INSERT_KEY);

}

@Bean

public Binding deleteQueueBinding(){

return BindingBuilder

.bind(deleteQueue())

.to(topicExchange())

.with(HotelMqConstants.DELETE_KEY);

}

}

5、发送消息MQ

注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:

交换机名称routingKey消息内容,这里消息体尽量小些,别把一整个对象发过去

package cn.llg.hotel.web;

import cn.llg.hotel.constants.HotelMqConstants;

import cn.llg.hotel.pojo.Hotel;

import cn.llg.hotel.pojo.PageResult;

import cn.llg.hotel.service.IHotelService;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.*;

import java.security.InvalidParameterException;

@RestController

@RequestMapping("hotel")

public class HotelController {

@Autowired

private IHotelService hotelService;

@Autowired

private RabbitTemplate rabbitTemplate;

@PostMapping

public void saveHotel(@RequestBody Hotel hotel){

// 新增酒店

hotelService.save(hotel);

// 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些

rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());

}

@PutMapping()

public void updateById(@RequestBody Hotel hotel){

if (hotel.getId() == null) {

throw new InvalidParameterException("id不能为空");

}

hotelService.updateById(hotel);

// 发送MQ消息

rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());

}

@DeleteMapping("/{id}")

public void deleteById(@PathVariable("id") Long id) {

hotelService.removeById(id);

// 发送MQ消息

rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);

}

//其他接口

@GetMapping("/list")

public PageResult hotelList(

@RequestParam(value = "page", defaultValue = "1") Integer page,

@RequestParam(value = "size", defaultValue = "1") Integer size

){

Page result = hotelService.page(new Page<>(page, size));

return new PageResult(result.getTotal(), result.getRecords());

}

@GetMapping("/{id}")

public Hotel queryById(@PathVariable("id") Long id){

return hotelService.getById(id);

}

}

6、监听MQ消息

hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。

新建类HotelListener类,并加@Component注解以Bean的形式管理

package cn.llg.hotel.mq;

import cn.llg.hotel.constants.HotelMqConstants;

import cn.llg.hotel.service.IHotelService;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**

* @date 2023/7/13

*/

@Component

public class HotelListener {

@Resource

IHotelService hotelService;

/**

* 监听酒店新增或者修改的业务

* id接受一个Long,因为发送过来的是一个Long id

* @param id 酒店ID

*/

@RabbitListener(queues = HotelMqConstants.INSERT_QUEUE_NAME)

public void listenHotelInsertAndUpdate(Long id){

hotelService.insertDocById(id);

}

/**

* 监听酒店删除业务

*/

@RabbitListener(queues = HotelMqConstants.DELETE_QUEUE_NAME)

public void listenHotelDelete(Long id){

hotelService.deleteDocById(id);

}

}

拿到MQ中的酒店id后,使用JavaHighLevelClient对象来更新ES数据

package cn.llg.hotel.service;

import cn.llg.hotel.domain.dto.RequestParams;

import cn.llg.hotel.domain.pojo.Hotel;

import cn.llg.hotel.domain.vo.PageResult;

import com.baomidou.mybatisplus.extension.service.IService;

public interface IHotelService extends IService {

void insertDocById(Long id);

void deleteDocById(Long id);

}

@Service

public class HotelService extends ServiceImpl implements IHotelService {

@Resource

RestHighLevelClient client;

@Override

public void insertDocById(Long id) {

try {

//0.根据ID查数据,并转为文档类型

Hotel hotel = getById(id);

HotelDoc hotelDoc = new HotelDoc(hotel);

//1.准备request

IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());

//2.准备DSL

request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);

//3.发送请求

client.index(request,RequestOptions.DEFAULT);

} catch (IOException e) {

throw new RuntimeException(e);

}

}

@Override

public void deleteDocById(Long id) {

try {

//1.准备request

DeleteRequest request = new DeleteRequest("hotel",id.toString());

//2.发送请求

client.delete(request,RequestOptions.DEFAULT);

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

最后补充下上面的Hotel和HotelDoc之间的转换关系:

@Data

@TableName("tb_hotel")

public class Hotel {

@TableId(type = IdType.INPUT)

private Long id;

private String name;

private String address;

private Integer price;

private Integer score;

private String brand;

private String city;

private String starName;

private String business;

private String longitude;

private String latitude;

private String pic;

}

@Data

@NoArgsConstructor

public class HotelDoc {

private Long id;

private String name;

private String address;

private Integer price;

private Integer score;

private String brand;

private String city;

private String starName;

private String business;

private String location;

private String pic;

//距离

private Object distance;

//是否充广告

private Boolean isAD;

//ES中的completion,后面存数组,这里可以对应成List

private List suggestion;

public HotelDoc(Hotel hotel) {

this.id = hotel.getId();

this.name = hotel.getName();

this.address = hotel.getAddress();

this.price = hotel.getPrice();

this.score = hotel.getScore();

this.brand = hotel.getBrand();

this.city = hotel.getCity();

this.starName = hotel.getStarName();

this.business = hotel.getBusiness();

this.location = hotel.getLatitude() + ", " + hotel.getLongitude();

this.pic = hotel.getPic();

if(this.business.contains("/")){

//此时business有多个值,需要分开后放入suggestion

String[] arr = this.business.split("/");

//添加元素

this.suggestion = new ArrayList<>();

Collections.addAll(this.suggestion,arr);

this.suggestion.add(this.brand);

}else{

this.suggestion = Arrays.asList(this.brand,this.business);

}

}

}

7、测试同步功能

重启两个服务,查看MQ:

点击队列查看详情,可以看到绑定交换机成功:

接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)

在酒店搜索页面搜一下:

可以看到ES数据跟随MySQL更新成功!

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: