RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点: 1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。 2、异步提升效率,在主业务逻辑发送消息,异步去处理消息 3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

2.java 代码引入相关jar包

org.springframework.boot

spring-boot-starter-amqp

org.springframework.amqp

spring-amqp

2.2.3.RELEASE

3.编写发送,接收消息的工具类

延迟队列配置

package com.next.mq;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @desc 延迟队列配置

*/

@Configuration

public class RabbitDelayMqConfig {

@Bean("delayDirectExchange")

public DirectExchange delayDirectExchange() {

DirectExchange directExchange = new DirectExchange(QueueConstants.DELAY_EXCHANGE, true, false);

//交换机开启延迟设置true,延迟才会生效

directExchange.setDelayed(true);

return directExchange;

}

@Bean("delayNotifyQueue")

public Queue delayNotifyQueue() {

return new Queue(QueueConstants.DELAY_QUEUE);

}

@Bean("delayBindingNotify")

public Binding delayBindingNotify(@Qualifier("delayDirectExchange") DirectExchange delayDirectExchange,

@Qualifier("delayNotifyQueue") Queue delayNotifyQueue) {

return BindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);

}

}

队列配置

package com.next.mq;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Primary;

/**

* @desc 队列配置

*/

@Configuration

public class RabbitMqConfig {

@Bean("directExchange")

@Primary

public DirectExchange directExchange() {

return new DirectExchange(QueueConstants.COMMON_EXCHANGE, true, false);

}

@Bean("notifyQueue")

@Primary

public Queue notifyQueue() {

return new Queue(QueueConstants.COMMON_QUEUE);

}

@Bean("bindingNotify")

@Primary

public Binding bindingNotify(@Qualifier("directExchange") DirectExchange directExchange,

@Qualifier("notifyQueue") Queue notifyQueue) {

return BindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);

}

}

发送消息工具类

package com.next.mq;

import com.next.util.JsonMapper;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageDeliveryMode;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.util.UUID;

/**

* @desc 客户端工具类 -- 发送消息

*/

@Component

@Slf4j

public class RabbitMqClient {

@Resource

private RabbitTemplate rabbitTemplate;

//发送同步消息

public void send(MessageBody messageBody) {

try {

//生成唯一的消息id

String uuid = UUID.randomUUID().toString();

//初始话消息

CorrelationData correlationData = new CorrelationData(uuid);

//使用模板工具类rabbitTemplate 来发消息

rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE, QueueConstants.COMMON_ROUTING,

JsonMapper.obj2String(messageBody), new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 消息持久化

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

//记录日志

log.info("message send, {}", message);

return message;

}

}, correlationData);

} catch (Exception e) {

//日志打印,以便定位问题

log.error("message send exception, msg:{}", messageBody.toString(), e);

}

}

/**

* @desc 发送延迟消息

*/

public void sendDelay(MessageBody messageBody, int delayMillSeconds) {

try {

//设置消息延迟时间

messageBody.setDelay(delayMillSeconds);

String uuid = UUID.randomUUID().toString();

CorrelationData correlationData = new CorrelationData(uuid);

//延迟交换机和路由

rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE, QueueConstants.DELAY_ROUTING,

JsonMapper.obj2String(messageBody), new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化

//设置消息延迟的时间(毫秒值)

message.getMessageProperties().setDelay(delayMillSeconds);

log.info("delay message send, {}", message);

return message;

}

}, correlationData);

} catch (Exception e) {

log.error("delay message send exception, msg:{}", messageBody.toString(), e);

}

}

}

接收消息工具类

package com.next.mq;

import com.next.dto.RollbackSeatDto;

import com.next.model.TrainOrder;

import com.next.service.TrainOrderService;

import com.next.service.TrainSeatService;

import com.next.util.JsonMapper;

import lombok.extern.slf4j.Slf4j;

import org.codehaus.jackson.type.TypeReference;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**

* @desc rabbitmq的server端 - 延迟接收消息

* 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能

*/

@Component

@Slf4j

public class RabbitDelayMqServer {

@Resource

private TrainSeatService trainSeatService;

@Resource

private TrainOrderService trainOrderService;

@RabbitListener(queues = QueueConstants.DELAY_QUEUE)

public void receive(String message) {

log.info("delay queue receive message, {}", message);

try {

MessageBody messageBody = JsonMapper.string2Obj(message, new TypeReference() {

});

if (messageBody == null) {

return;

}

switch (messageBody.getTopic()) {

case QueueTopic.SEAT_PLACE_ROLLBACK:

RollbackSeatDto dto = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference() {

});

trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());

break;

case QueueTopic.ORDER_PAY_DELAY_CHECK:

TrainOrder trainOrder = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference() {

});

trainOrderService.delayCheckOrder(trainOrder);

break;

default:

log.warn("delay queue receive message, {}, no need handle", message);

}

} catch (Exception e) {

log.error("delay queue message handle exception, msg:{}", message, e);

}

}

}

参考链接: 1.rabbitMQ到底是个啥东西? 2.超详细!!!Windows下安装RabbitMQ的步骤详解 3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法) 4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: