RabbitMQ 高级特性

5.1 消息可靠性

消息从发送,到消费者接收,会经理多个过程,如下所示: 其中的每一步都可能导致消息丢失,常见的丢失原因包括:

发送时丢失:

生产者发送的消息未送达 exchange消息到达 exchange 后未到达 queue MQ 宕机,queue 将消息丢失consumer 接收到消息后未消费就宕机

针对这些问题,RabbitMQ 分别给出了解决方案:

生产者确认机制mq 持久化消费者确认机制失败重试机制

我们以一个 Demo 进行演示:

5.1.1 生产者消息确认

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到 MQ 以后,会返回一个结果给发送者,表示消息是否处理成功。 返回结果有两种方式:

publisher-confirm,发送者确认

消息成功投递到交换机,返回 ack消息未投递到交换机,返回 nack publisher-return,发送者回执

消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突

修改 publisher 服务中的配置:

spring:

rabbitmq:

publisher-confirm-type: correlated

publisher-returns: true

template:

mandatory: true

说明:

publish-confirm-type:开启 publisher-confirm,这里支持两种类型:

simple:同步等待 confirm 结果,直到超时correlated:异步回调,定义 ConfirmCallback ,MQ 返回结果时会回调这个 ConfirmCallback publish-returns:开启 publish-return 功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

定义回调函数

// 设置发送者确认回调函数

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

/**

* @param correlationData 自定义的数据 一般是消息的 UUID

* @param b 是否确认 true:消息发送到 exchange 中 false:消息未发送到 exchange 中

* @param s 原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

log.info("发送确认回调触发 消息的ID===> {}", correlationData.getId());

if (b) {

log.info("消息成功发送到交换机中!!!");

} else {

log.error("消息发送到交换机中失败!!!,原因:{}", s);

// 可以重发

}

}

});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

/**

* 只要这个方法被调用,代表消息没能正确路由到队列,被 mq 返还回来了

* @param message 返回的消息

* @param i 回复状态码

* @param s 回复内容

* @param s1 交换机

* @param s2 路由 key

*/

@Override

public void returnedMessage(Message message, int i, String s, String s1, String s2) {

log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",

i, s, s1, s2, message.toString());

// 如果有业务需要,可以重发

}

});

5.1.2 消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。 要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

交换机持久化队列持久化消息持久化

默认情况下,由 SpringAMQP 声明的交换机都是持久化的

@Bean

public FanoutExchange fanoutExchange() {

// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除

return new FanoutExchange("fanout.exchange", true, false);

}

由 SpringAMQP 声明的队列都是持久化的

@Bean

public Queue queue() {

return new Queue("fanout.queue");

}

利用 SpringAMQP 发送消息时,可以设置消息的属性(MessageProperties),指定 delivery-mode:

1:非持久化2:持久化

默认情况下,SpringAMQP 发出的任何消息都是持久化的,不用特意指定

@Test

public void testSendDurableMessage() throws InterruptedException {

// 1.消息体

Message message = MessageBuilder.

withBody("hello, spring amqp!".getBytes(StandardCharsets.UTF_8))

.setDeliveryMode(MessageDeliveryMode.PERSISTENT)

.build();

// 2.发送消息

rabbitTemplate.convertAndSend("simple.queue", message);

}

5.1.3 消费者消息确认

设想这样的场景:

RabbitMQ 投递消息给消费者消费者获取消息后,返回 ACK 给 RabbitMQRabbitMQ 删除消息消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回 ACK 的时机非常重要 而 SpringAMQP 则允许配置三种确认模式:

manual:手动 ack,需要在业务代码结束后,调用 ap i发送 ack。auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回nacknone:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

none模式下,消息投递是不可靠的,可能丢失auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ackmanual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可 手动ack:

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: manual # 手动ack

@RabbitListener(

bindings = {

@QueueBinding(

value = @Queue,

exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),

key = {"black.*.#"}

)

}

)

public void getMessage3(String msg, Channel channel, Message message) throws IOException {

System.out.println("接收到消息3:" + msg);

// 手动 ack

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

auto 模式 首先我们修改消费者的 yml 配置文件

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态) 抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:

5.1.4 消费失败重试机制

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致 mq 的消息处理飙升,带来不必要的压力,我们怎么办? 我们可以利用 Spring 的 retry 机制(本地重试),在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。 修改 consumer 服务的 application.yml 文件,添加内容

spring:

rabbitmq:

listener:

simple:

retry:

enabled: true # 开启消费者失败重试

initial-interval: 1000ms # 初始的失败等待时长为1秒

multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval

max-attempts: 3 # 最大重试次数

stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启 consumer 服务,重复之前的测试。可以发现:

在重试3次后,SpringAMQP会抛出异常 AmqpRejectAndDontRequeueException,说明本地重试触发了查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是ack,mq删除消息了

由上述的发现可得知,开启本地重试后,最终消息还是会丢失,这个我们需要怎么解决? 这个时候我们可以自定义失败策略 在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由 Spring 内部机制决定的, 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecovery 接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队,这种也类似无限循环** RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机 **

比较优雅的一种处理方案是 RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。 在 consumer 中定义处理失败消息的交换机和队列

@Bean

public DirectExchange errorMessageExchange(){

return new DirectExchange("error.direct");

}

@Bean

public Queue errorQueue(){

return new Queue("error.queue", true);

}

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){

return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

}

定义一个 RepublishMessageRecoverer,关联队列和交换机

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

5.2 死信交换机

5.2.1 什么是死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为false消息是一个过期消息,超时无人消费要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX) 另外,队列将死信投递给死信交换机时,必须知道两个信息:

死信交换机名称死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列 在失败重试策略中,默认的 RejectAndDontRequeueRecoverer 会在本地重试次数耗尽后,发送 reject 给RabbitMQ,消息变成死信,被丢弃。 我们可以给 simple.queue 添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列,这也是一种防止信息丢失的方法,不过我们经常用的是失败策略,而不用死信交换机,原因是因为配置麻烦

@Bean

public Queue simpleQueue() {

// 配置死信交换机

return QueueBuilder.durable("simple.queue")

.deadLetterExchange("dl.exchange")

.deadLetterRoutingKey("dl")

.build();

}

@Bean

public Queue dlQueue() {

return new Queue("dl.queue");

}

@Bean

public DirectExchange dlExchange() {

return new DirectExchange("dl.exchange");

}

@Bean

public Binding dlBinding() {

return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");

}

5.2.2 TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况

消息所在的队列设置了超时时间消息本身设置了超时时间

/**

* 基于注解方式声明一组死信交换机和队列

*/

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "dl.queue"),

exchange = @Exchange(name = "dl.direct"),

key = "dl"

))

public void listenDlQueue(String msg) {

log.info("接收到 ttl.queue的延迟消息:{}", msg);

}

@Bean

public DirectExchange ttlExchange() {

return new DirectExchange("ttl.direct");

}

@Bean

public Queue ttlQueue() {

return QueueBuilder.durable("ttl.queue")

.ttl(10000) // 设置队列的超时时间 10s

.deadLetterExchange("dl.direct") // 指定死信交换机

.deadLetterRoutingKey("dl") // 指定死信 RoutingKey

.build();

}

@Bean

public Binding ttlBinding() {

return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");

}

@Test

public void testTTLMsg() {

// 创建消息

Message message = MessageBuilder

.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))

.setExpiration("5000") // 设置消息的过期时间 5s

.build();

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);

}

总结: 消息超时的两种方式是?

给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

给消息的目标队列指定死信交换机将消费者监听的队列绑定到死信交换机发送消息时给消息设置超时时间为20秒

5.2.3 延迟队列

上面的死信交换机和 TTL 在我们项目中一般不使用,我们一般使用延迟队列来进行实现延迟发送效果 因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。 这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html 在使用的时候我们首先需要安装,这里不再演示 DelayExchange 原理 DelayExchange需要将一个交换机声明为 delayed 类型。当我们发送消息到 delayExchange 时,流程如下:

接收消息判断消息是否具备 x-delay 属性如果有 x-delay 属性,说明是延迟消息,持久化到硬盘,读取 x-delay 值,作为延迟时间返回routing not found结果给消息发送者x-delay 时间到期后,重新投递消息到指定队列

使用 DelayExchange

首先声明 DelayExchange 交换机

注解声明(推荐)

@RabbitListener(bindings = @QueueBinding(

value = @Queue("delay.queue"), // 队列

exchange = @Exchange(value = "delay.direct", delayed = "true"), // Dealay 交换机

key = "delay"

))

public void listenDelayQueue(String msg) {

log.info("接收到 delay.queue的延迟消息:{}", msg);

}

基于 Bean 方式

发送消息

发送消息时,一定要携带 x-delay 属性,指定延迟的时间:

@Test

public void testDelayMsg() {

// 创建消息

Message message = MessageBuilder

.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))

.setHeader("x-delay", 10000)

.build();

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

}

5.3 惰性队列

5.3.1 消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题 解决消息堆积有三种种思路:

增加更多消费者,提高消费速度在消费者内开启线程池加快消息处理速度扩大队列容积,提高堆积上限

5.3.2 惰性队列

从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是惰性队列。惰性队列的特征如下:

接收到消息后直接存入磁盘而非内存消费者要消费消息时才会从磁盘中读取并加载到内存支持数百万条的消息存储

惰性队列的优点有哪些?

基于磁盘存储,消息上限高没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

基于磁盘存储,消息时效性会降低性能受限于磁盘IO

基于命令行设置lazy-queue 而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

rabbitmqctl set_policy Lazy "^simple.queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

rabbitmqctl :RabbitMQ的命令行工具set_policy :添加一个策略Lazy :策略名称,可以自定义"^lazy-queue$" :用正则表达式匹配队列的名字'{"queue-mode":"lazy"}' :设置队列模式为lazy模式--apply-to queues:策略的作用对象,是所有的队列

基于@Bean声明lazy-queue

@Bean

public Queue lazyQueue() {

return QueueBuilder.durable("lazy.queue")

.lazy() // 开启 x-queue-mode 为 lazy

.build();

}

基于@RabbitListener声明LazyQueue

@RabbitListener(queuesToDeclare = @Queue(

value = "lazy.queue",

durable = "true",

arguments = @Argument(name = "x-queue-mode", value = "lazy") // 惰性队列

))

public void listenLazyQueue(String msg) {

log.info("接收到 lazy.queue的消息:{}", msg);

}

5.4 MQ 集群

集群搭建,这部分一般开发不会搭建,而是运维搭建,了解即可,但是我们需要知道 MQ 的集群以及特点 RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

普通模式:普通模式集群提高了并发能力,但是不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息,提高了数据的可用性。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

5.4.1 普通集群

普通集群,或者叫标准集群(classic cluster),具备下列特征

会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回队列所在节点宕机,队列中的消息就会丢失

5.4.2 镜像集群

在普通集群中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案 官方文档地址:https://www.rabbitmq.com/ha.html 镜像集群:本质是主从模式,具备下面的特征

交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。一个队列的主节点可能是另一个队列的镜像节点所有操作都是主节点完成,然后同步给镜像节点主宕机后,镜像节点会替代成新的主节点(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)

5.4.3 仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征

与镜像队列一样,都是主从模式,支持主从数据同步使用非常简单,没有复杂的配置主从同步基于Raft协议,强一致

用 Java 代码创建仲裁队列

@Bean

public Queue quorumQueue() {

return QueueBuilder

.durable("quorum.queue") // 持久化

.quorum() // 仲裁队列

.build();

}

SpringAMQP 连接 MQ 集群

spring:

rabbitmq:

addresses: 192.168.80.128:8071, 192.168.80.128:8072, 192.168.80.128:8073

username: muziteng

password: 806823

virtual-host: /

注意,这里用 address 来代替 host、port 方式

更多知识在我的语雀知识库:https://www.yuque.com/ambition-bcpii/muziteng

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: