使用 RabbitMQ 在 Spring Boot 应用中实现消息队列

消息队列(Message Queue,MQ)是一种用于在应用程序之间传递消息的通信机制,常用于解耦和异步处理。在 Java Spring Boot 应用中,结合 RabbitMQ 可以轻松实现消息队列功能。本文将介绍消息队列的现状与作用,讨论 RabbitMQ 的三种交换机类型,并详细解读 RabbitMQ 的配置文件。我们还会为每种交换机类型提供示例,包括监听者和消息发送测试。

消息队列的作用

消息队列是现代应用架构中的重要组成部分,其主要作用包括:

解耦:将不同模块之间的通信解耦,降低模块间的依赖性,提高系统的可维护性和可扩展性。异步处理:将耗时的操作转化为异步任务,提高系统的响应速度和吞吐量。削峰填谷:平衡系统负载,防止突发请求导致系统崩溃。数据同步:在分布式系统中保持数据一致性。消息通知:用于实现事件驱动的架构,如用户注册、订单支付等。

RabbitMQ 交换机的三大类型

RabbitMQ 中的交换机用于消息的路由分发,共有四种类型,以下主要讲解常用的三种类型:

直连交换机(Direct Exchange):根据消息的路由键(Routing Key)将消息投递到与之完全匹配的队列。适用于一对一的消息通信。 生产者配置 @Configuration

public class RabbitMqConfig {

@Bean

AmqpAdmin amqpAdmin(CachingConnectionFactory cachingConnectionFactory) {

AmqpAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory);

amqpAdmin.declareExchange(directExchange());

amqpAdmin.declareQueue(directQueueA());

amqpAdmin.declareQueue(directQueueB());

amqpAdmin.declareBinding(directBindingA());

amqpAdmin.declareBinding(directBindingB());

return amqpAdmin;

}

DirectExchange directExchange() {

return new DirectExchange("directExchange", true, false);

}

Queue directQueueA() {

return new Queue("direct", true, false, false);

}

Queue directQueueB() {

return new Queue("direct1", true, false, false);

}

Binding directBindingA() {

return BindingBuilder.bind(directQueueA()).to(directExchange()).with("directKey");

}

Binding directBindingB() {

return BindingBuilder.bind(directQueueB()).to(directExchange()).with("directKeyB");

}

消费监听者 @Component

@Slf4j

public class DirectConsumer {

@RabbitListener(queues = "direct")

public void processA(Map map, Channel channel, Message message) throws IOException {

log.info("收到direct队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "direct1")

public void process1(Map map, Channel channel, Message message) throws IOException {

log.info("收到directB队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

}

测试消息发送 @Test

public void directSend() {

Map map = new HashMap<>(6);

map.put("messageId", UUID.randomUUID().toString().replace("-", ""));

map.put("data", "hello rabbitmq direct!");

map.put("time", LocalDateTime.now());

rabbitTemplate.convertAndSend("directExchange", "directKey", map);

rabbitTemplate.convertAndSend("directExchange", "directKeyB", map);

}

程序输出 收到directB队列的消息:{data=hello rabbitmq direct!, messageId=255e6b955ae1474c857fa2616d9931da, time=2023-08-15T16:32:36.113}

收到direct队列的消息:{data=hello rabbitmq direct!, messageId=255e6b955ae1474c857fa2616d9931da, time=2023-08-15T16:32:36.113}

主题交换机(Topic Exchange):根据模式匹配路由键将消息投递到多个队列。适用于灵活的消息通信,支持通配符匹配。 生产者配置 @Configuration

public class RabbitMqConfig {

@Bean

AmqpAdmin amqpAdmin(CachingConnectionFactory cachingConnectionFactory) {

AmqpAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory);

amqpAdmin.declareExchange(topic123Exchange());

amqpAdmin.declareExchange(topicAbcExchange());

amqpAdmin.declareQueue(topicQueue1());

amqpAdmin.declareQueue(topicQueue2());

amqpAdmin.declareQueue(topicQueue3());

amqpAdmin.declareQueue(topicQueueA());

amqpAdmin.declareQueue(topicQueueB());

amqpAdmin.declareQueue(topicQueueC());

amqpAdmin.declareBinding(topicBindingA());

amqpAdmin.declareBinding(topicBindingB());

amqpAdmin.declareBinding(topicBindingC());

amqpAdmin.declareBinding(topicBinding1());

amqpAdmin.declareBinding(topicBinding2());

amqpAdmin.declareBinding(topicBinding3());

return amqpAdmin;

}

TopicExchange topicAbcExchange() {

return new TopicExchange("topicExchangeABC", true, false);

}

TopicExchange topic123Exchange() {

return new TopicExchange("topicExchange123", true, false);

}

Queue topicQueueA() {

return new Queue("topicQueueA", true, false, false);

}

Queue topicQueueB() {

return new Queue("topicQueueB", true, false, false);

}

Queue topicQueueC() {

return new Queue("topicQueueC", true, false, false);

}

Queue topicQueue1() {

return new Queue("topicQueue1", true, false, false);

}

Queue topicQueue2() {

return new Queue("topicQueue2", true, false, false);

}

Queue topicQueue3() {

return new Queue("topicQueue3", true, false, false);

}

Binding topicBindingA() {

return BindingBuilder.bind(topicQueueA()).to(topicAbcExchange()).with("topic.#");

}

Binding topicBindingB() {

return BindingBuilder.bind(topicQueueB()).to(topicAbcExchange()).with("topic.*");

}

Binding topicBindingC() {

return BindingBuilder.bind(topicQueueC()).to(topicAbcExchange()).with("topic.b");

}

Binding topicBinding1() {

return BindingBuilder.bind(topicQueue1()).to(topic123Exchange()).with("topic.*");

}

Binding topicBinding2() {

return BindingBuilder.bind(topicQueue2()).to(topic123Exchange()).with("topic.2");

}

Binding topicBinding3() {

return BindingBuilder.bind(topicQueue3()).to(topic123Exchange()).with("topic.3");

}

}

消费监听者 @Slf4j

@Component

public class TopicConsumer {

@RabbitListener(queues = "topicQueueA")

public void processA(Map map, Channel channel, Message message) throws IOException {

log.info("收到topicA队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "topicQueueB")

public void processB(Map map, Channel channel, Message message) throws IOException {

log.info("收到topicB队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "topicQueueC")

public void processC(Map map, Channel channel, Message message) throws IOException {

log.info("收到topicC队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "topicQueue1")

public void process1(Map map, Channel channel, Message message) throws IOException {

log.info("收到topic1队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "topicQueue2")

public void process2(Map map, Channel channel, Message message) throws IOException {

log.info("收到topic2队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "topicQueue3")

public void process3(Map map, Channel channel, Message message) throws IOException {

log.info("收到topic3队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

}

测试消息发送 @Test

public void topicSend() {

Map map = new HashMap<>(6);

map.put("messageId", UUID.randomUUID().toString().replace("-", ""));

map.put("data", "hello rabbitmq topic!");

map.put("time", LocalDateTime.now());

rabbitTemplate.convertAndSend("topicExchangeABC", "topic.2", map);

}

程序输出 收到topicA队列的消息:{data=hello rabbitmq topic!, messageId=4736d433d8484a63a8d5a4a88a611d04, time=2023-08-15T16:37:19.948}

收到topicB队列的消息:{data=hello rabbitmq topic!, messageId=4736d433d8484a63a8d5a4a88a611d04, time=2023-08-15T16:37:19.948}

扇形交换机(Fanout Exchange):将消息广播到与之绑定的所有队列,无视路由键。适用于一对多的消息广播。 生产者配置 @Configuration

public class RabbitMqConfig {

@Bean

AmqpAdmin amqpAdmin(CachingConnectionFactory cachingConnectionFactory) {

AmqpAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory);

amqpAdmin.declareExchange(fanoutExchange());

amqpAdmin.declareQueue(fanoutQueueA());

amqpAdmin.declareQueue(fanoutQueueB());

amqpAdmin.declareQueue(fanoutQueueC());

amqpAdmin.declareBinding(fanoutBindingA());

amqpAdmin.declareBinding(fanoutBindingB());

amqpAdmin.declareBinding(fanoutBindingC());

return amqpAdmin;

}

FanoutExchange fanoutExchange() {

return new FanoutExchange("fanoutExchange", true, false);

}

Queue fanoutQueueA() {

return new Queue("FanoutA", true, false, false);

}

Queue fanoutQueueB() {

return new Queue("FanoutB", true, false, false);

}

Queue fanoutQueueC() {

return new Queue("FanoutC", true, false, false);

}

Binding fanoutBindingA() {

return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());

}

Binding fanoutBindingB() {

return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());

}

Binding fanoutBindingC() {

return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());

}

}

消费监听者 @Slf4j

@Component

public class FanoutConsumer {

@RabbitListener(queues = "FanoutA")

public void processA(Map map, Channel channel, Message message) throws IOException {

log.info("收到fanoutA队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "FanoutB")

public void processB(Map map, Channel channel, Message message) throws IOException {

log.info("收到fanoutB队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

@RabbitListener(queues = "FanoutC")

public void processC(Map map, Channel channel, Message message) throws IOException {

log.info("收到fanoutC队列的消息:" + map.toString());

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

}

测试消息发送 @Test

public void fanoutSend() {

Map map = new HashMap<>(6);

map.put("messageId", UUID.randomUUID().toString().replace("-", ""));

map.put("data", "hello rabbitmq fanout!");

map.put("time", LocalDateTime.now());

rabbitTemplate.convertAndSend("fanoutExchange", "", map);

}

程序输出 收到fanoutA队列的消息:{data=hello rabbitmq fanout!, messageId=421d000c0e3e4304bcb5778af237ea22, time=2023-08-15T16:39:20.999}

收到fanoutB队列的消息:{data=hello rabbitmq fanout!, messageId=421d000c0e3e4304bcb5778af237ea22, time=2023-08-15T16:39:20.999}

收到fanoutC队列的消息:{data=hello rabbitmq fanout!, messageId=421d000c0e3e4304bcb5778af237ea22, time=2023-08-15T16:39:20.999}

RabbitMQ 配置文件解读

以下是一个 Spring Boot 中 RabbitMQ 的配置文件示例 application.yml:

spring:

rabbitmq:

addresses: #RabbitMQ 服务器的地址列表,用逗号分隔,用于建立连接。如果不指定,会使用 host 和 port 参数来连接

cache: #缓存配置,用于缓存 RabbitMQ 的 channel 和 connection

channel: #缓存 channel 的相关配置

checkout-timeout: #从缓存中获取 channel 的超时时间

size: #缓存中保持的 channel 数量

connection: #缓存 connection 的相关配置

mode: #连接工厂缓存模式,可以是 channel 或 connection

size: #缓存的 channel 数量,只有在 CONNECTION 模式下生效

connection-timeout: #连接超时时间,单位毫秒,0 表示永不超时

host: #RabbitMQ 服务器的主机名,默认为 localhost

listener: #监听器的配置

direct: #直接消费者的配置

acknowledge-mode: #ack模式

auto-startup: #true 是否在启动时自动启动容器

consumers-per-queue: #每个队列消费者数量

default-requeue-rejected: #默认是否将拒绝传送的消息重新入队

idle-event-interval: #空闲容器事件发布时间间隔

missing-queues-fatal: #false若容器声明的队列在代理上不可用,是否失败

prefetch: #每个消费者可最大处理的nack消息数量

retry:

enabled: #false 是否启用发布重试机制

initial-interval: #1000ms 第一次和第二次尝试之间的持续时间传递消息

max-attempts: #3 发送消息的最大尝试次数

max-interval: #10000ms 尝试之间的最大持续时间

multiplier: #1 适用于先前重试间隔的乘数

stateless: #true 重试是无状态的还是有状态的

simple: #简单消费者的配置

acknowledge-mode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto

auto-startup: #true 是否启动时自动启动容器

concurrency: #最小的消费者数量

default-requeue-rejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)

idle-event-interval: #发布空闲容器的时间间隔,单位毫秒

max-concurrency: #最大的消费者数量

missing-queues-fatal: #true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器

prefetch: #一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量

retry:

enabled: #false 监听重试是否可用

initial-interval: #1000ms:第一次和第二次尝试传递消息的时间间隔

max-attempts: #3 最大重试次数

max-interval: #10000ms 最大重试时间间隔

multiplier: #1 应用于上一重试间隔的乘数

stateless: #true 重试时有状态or无状态

transaction-size: #当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch则prefetch将增加到这个值

type: #simple 容器类型.simple或direct

password: #连接 RabbitMQ 服务器所需的密码

port: #RabbitMQ 服务器的端口,默认为 5672

publisher-confirms: #是否启用发布确认机制,默认为 false

publisher-returns: #是否启用发布返回机制,默认为 false

requested-heartbeat: #请求心跳超时时间,单位秒,0 表示不指定

ssl:

algorithm: #ssl使用的算法,默认由rabiitClient配置

enabled: #是否支持ssl,默认false

key-store: #持有SSL certificate的key store的路径

key-store-password: #访问key store的密码

trust-store: #持有SSL certificates的Trust store

trust-store-password: #访问trust store的密码

trust-store-type: #JKS:Trust store 类型.

validate-server-certificate: #true:是否启用服务端证书验证

verify-hostname: #true 是否启用主机验证

template: #RabbitTemplate 的配置

mandatory: #是否启用强制信息,默认为 false

receive-timeout: #receive() 操作的超时时间

reply-timeout: #sendAndReceive() 操作的超时时间

retry:

enabled: #false 发送重试是否可用

initial-interva: #第一次和第二次尝试发布或传递消息之间的间隔

max-attempts: #最大重试次数

max-interval: #最大重试时间间隔

multiplier: #应用于上一重试间隔的乘数

username: #连接 RabbitMQ 服务器所需的用户名

virtual-host: #连接到 RabbitMQ 服务器时使用的虚拟主机

参考文档

官方文档

springboot-AMQP官方文档

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: