文章目录

MQ 简介1、简介2、MQ优缺点3、MQ应用场景4、AMQP 和 JMS5、常见的 MQ 产品

RabbitMQ 工作原理Linux 环境安装 RabbitMQ1、rmp安装法1.1 安装1.2 开启管理界面1.3 启动与停止1.4 创建新用户

2、docker安装法2.1 安装2.2 下载rabbitmq_delayed_message_exchange插件

RabbitMQ 工作模式1、简单模式(Hello Wold)2、工作队列模式(Work Queue)3、发布订阅模式(Publish/Subscribe)4、路由模式(Routing)5、通配符模式(Topics)6、远程调用模式(RPC, 不常用)

Springboot 整合 RabbitMQ消息的可靠性投递1、确认模式2、回退模式3、消费者消费确认模式

其他高级特性1、消费端限流2、不公平分发3、消息存活时间4、优先级队列

死信队列延迟队列

MQ 简介

1、简介

MQ 全称为 Message Queue,即消息队列。“消息队列” 是在消息的传输过程中保存消息的容器。 它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

消息,两台计算机间传送的数据单位。可以非常简单,也可以更复杂。

队列,数据结构中概念。在队列中,数据先进先出,后进后出。

2、MQ优缺点

优点:

应用解耦 生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可,而不需要和其他系统有耦合,这显然也提高了系统的扩展性。 异步提速 将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。 削峰限流 先将短时间内高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。

缺点:

系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。 系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。 一致性问题 A 系统处理完业务,通过 MQ 给 B、C、D 三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败,则会造成数据处理的不一致。

3、MQ应用场景

应用解耦

在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:

如果库存系统出现故障,会造成整个订单系统崩溃。如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。

如果在系统中引入 MQ,即订单系统将消息先发送到 MQ 中,MQ 再转发到其他系统,则会解决以下问题:

由于订单系统只发消息给 MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。如果需求修改,新增了一个 X 系统,此时无需修改订单系统的代码,只需修改 MQ 将消息发送给 X 系统即可。

异步提速

如果订单系统同步访问每个系统,则用户下单等待时长为920;如果引入 MQ,则用户下单等待时长为25 削峰限流

假设我们的系统每秒只能承载 1000 请求,如果请求瞬间增多到每秒 5000,则会造成系统崩溃。此时引入 MQ 即可解决该问题。 使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被 “削” 掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做限流。

4、AMQP 和 JMS

AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受中间件产品、开发语言等条件的限制。类比 HTTP 协议。

JMS,即 Java Message Service,是 Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。一种规范,和 JDBC、Jedis 担任的角色类似。

区别:

JMS 是定义了统一的接口来统一消息操作;AMQP 通过协议统一数据交换格式JMS 必须使用 Java 语言;AMQP 只是协议,与语言无关(跨语言)JMS 规定了两种消息模型(队列模型和发布订阅模型);AMQP 的消息模型更为丰富

5、常见的 MQ 产品

ActiveMQRabbitMQRocketMQKafka开发语言JavaErlangJavaScala & Java协议支持AMQP,REST,XMPP,STOMPAMQP,XMPP,SMTP,STOMP自定义自定义协议,社区封装了HTTP协议支持客户端支持语言Java,C,C++,Python,PHP等官方支持 Erlang,Java等,社区产出多种 API,几乎支持所有语言Java,C++官方支持 Java,社区产出多种 API,如 PHP,Python等单机吞吐量万级万级十万级十万级消息延迟毫秒级微秒级毫秒级毫秒以内高可用主从架构镜像集群模式分布式架构分布式架构消息可靠性较低概率丢失数据不丢失数据保证数据不丢失功能特性老牌产品,成熟度高,文档较丰富并发能力强,性能极其好,支持一些消息中间件的高级功能,延时低,社区活跃,管理界面较为丰富MQ性能比较完备,扩展性强,支持大量的消息中间件高级功能只支持主要的 MQ 功能(接收与发送),主要应用于大数据领域

RabbitMQ 工作原理

RabbitMQ 是由 Erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

官网地址:https://www.rabbitmq.com/which-erlang.html

Producer 消息的生产者。也是一个向交换机发布消息的客户端应用程序。 Connection 连接。生产者/消费者和 RabbitMQ 服务器之间建立的 TCP 连接。 Channel 信道。是 TCP 里面的虚拟连接。例如:Connection 相当于电缆,Channel 相当于独立光纤束,一条 TCP 连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。 Broker 消息队列服务器实体。即 RabbitMQ 服务器。 Virtual Host 虚拟主机。出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中。每个 Virtual Host 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个 RabbitMQ 服务器时,可以划分出多个虚拟主机。RabbitMQ 默认的虚拟主机路径是 / Exchange 交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。

rabbitMQ 交换机类型有4种:

direct(直连):它会把消息路由到那些 BindingKey RoutingKey完全匹配的队列中。fanout(扇形):它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。topic(主题):将消息路由到 BindingKey RoutingKey 相匹配的队列中。headers(标题):交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中headers 属性进行匹配。

直连交换机(dirext exchange)为 RabbitMQ 默认的交换机。

Queue 消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。 Binding 消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。 Consumer 消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。

拓展:RabbitMQ 为什么使用信道而不直接使用 TCP 连接通信?

TCP 连接的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次挥手。高峰时每秒成千上万条 TCP 连接的创建会造成资源巨大的浪费。而且操作系统每秒处理 TCP 连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

Linux 环境安装 RabbitMQ

1、rmp安装法

1.1 安装

# 安装erlang所需要的依赖

yum install -y epel-release

# 安装 Erlang

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

# 查看Erlang 是否安装成功

erl -version

# 安装 socat

rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm

# 安装 RabbitMQ

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

1.2 开启管理界面

rabbitmq-plugins enable rabbitmq_management

1.3 启动与停止

service rabbitmq-server start # 启动服务

service rabbitmq-server stop # 停止服务

service rabbitmq-server restart # 重启服务

# 如果这里访问不到: (http://ip:15672/ )关闭防火墙,并重新启动【guest / guest】

service iptables stop

1.4 创建新用户

# 创建账户

rabbitmqctl add_user admin 123

# 设置用户角色

rabbitmqctl set_user_tags admin administrator

# 设置用户权限

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

# 查看所有用户

rabbitmqctl list_users

2、docker安装法

2.1 安装

docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

2.2 下载rabbitmq_delayed_message_exchange插件

官网下载地址:https://www.rabbitmq.com/community-plugins.html

注意:具体下载哪个版本的插件,可进入容器内部查看。一定要大于等于当前版本。

[root@iZ2zeffygi8nlek3pfjco8Z ~]# docker exec -it e91 /bin/bash

root@e91d8abdddb4:/# rabbitmq-plugins list

# 将插件拷贝到docker容器

docker cp rabbitmq_delayed_message_exchange-3.9.0.ez e91:/opt/rabbitmq/plugins

# 开启管理界面

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行完命令之后,重新启动 RabbitMQ 容器,然后登录RabbitMQ 的Web端界面查看插件是否启动成功。

以下即为管控台界面:

RabbitMQ 工作模式

RabbitMQ 共有六种工作模式:

简单模式(Simple)工作队列模式(Work Queue)发布订阅模式(Publish/Subscribe)路由模式(Routing)通配符模式(Topics)远程调用模式(RPC,不常用)

1、简单模式(Hello Wold)

特点:

一个生产者对应一个消费者,通过队列进行消息传递。该模式使用 direct 交换机,direct 交换机是 RabbitMQ 默认交换机。

1.1 引入依赖

com.rabbitmq

amqp-client

5.14.0

1.2 生产者代码

package com.sea.rabbitmq.queue.simplequeue;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 简单模式生产者

*

* @author sea

* @date 2023-12-01

*/

public class Producer {

//队列名称

public static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {

//1、创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("47.94.151.26");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");

//2、创建连接

Connection connection = factory.newConnection();

//3、建立信道

Channel channel = connection.createChannel();

//4、创建队列,如果队列存在,则直接使用

/**

* 方法参数:

* 参数1: 队列名称

* 参数2: 是否持久化。true表示队列会保存磁盘,MQ重启后队列还在

* 参数3: 是否私有化。true表示只有第一次拥有它的消费者才能访问,false表示所有消费者都能访问

* 参数4: 是否自动删除。true表示不再使用队列时自动删除队列

* 参数5: 队列其他参数。如:x-message-ttl,x-expires等

*/

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//5、发送消息

String message = "Hello rabbitmq";

/**

* 方法参数:

* 参数1: 交换机名,""表示默认交换机

* 参数2: 路由key,简单模式为队列名

* 参数3: 消息的其他属性,如路由头等

* 参数4: 消息体(字节数组格式)

*/

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

//6、关闭信道和连接

channel.close();

connection.close();

System.out.println("生产消息发送成功。");

}

}

运行生产者后,我们可以看到在 RabbitMQ 中创建了队列,队列中已经有了消息,具体详情如下: 1.3 消费者代码

package com.sea.rabbitmq.queue.simplequeue;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 简单模式消费者

*

* @author sea

* @date 2023-12-01

*/

public class Consumer {

//队列名称

public static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {

//1、创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("47.94.151.26");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");

//2、创建连接

Connection connection = factory.newConnection();

//3、建立信道

Channel channel = connection.createChannel();

//4、监听队列

/**

* 方法参数:

* 参数1: 监听的队列名

* 参数2: 是否自动签收。如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息

* 参数3: Consumer的实现类,重写该类方法表示接收到消息后如何消费

*/

channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body);

System.out.println("接收到消息为:" + message);

}

});

}

}

2、工作队列模式(Work Queue)

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用 direct 交换机,应用于处理消息较多的情况。特点如下:

一个队列对应多个消费者。一条消息只会被一个消费者消费。消息队列默认采用 轮询 的方式将消息平均发送给消费者。

2.1 封装工具类

由于连接 RabbitMQ 的操作都一样,所以这里我们将代码抽取出来进行封装。

注意:没有关闭资源连接。

package com.sea.rabbitmq.queue.utils;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* Rabbitmq连接工具类

*

* @author sea

* @date 2023-12-01

*/

public class RabbitMqUtils {

public static Channel getChannel() throws IOException, TimeoutException {

// 1.创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("47.94.151.26");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");

// 2.创建连接

Connection connection = factory.newConnection();

// 3.建立信道

return connection.createChannel();

}

}

2.2 生产者代码

package com.sea.rabbitmq.queue.workqueue;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.MessageProperties;

import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 工作队列生产者

*

* @author sea

* @date 2023-12-01

*/

public class Producer {

//队列名称

public static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws IOException, TimeoutException {

//1、通过工具类创建连接

Channel channel = RabbitMqUtils.getChannel();

//2、创建队列

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

//3、发送批量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中

for (int i = 1; i <= 10; i++) {

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,

("发送成功,这是第 " + i + " 条消息").getBytes());

}

//4、关闭资源

channel.close();

}

}

2.3 消费者代码 这里我们编写两个消费者去消费生产出来的消息,两个消费者的代码大体一致,不同的是输出内容,所以此处只展示消费者01的代码:

package com.sea.rabbitmq.queue.workqueue;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 工作队列消费者1

*

* @author sea

* @date 2023-12-01

*/

public class Consumer1 {

//队列名称

private static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("消费者01消费成功,内容为: " + message);

}

});

}

}

2.4 结果 消费者01消费结果如下: 消费者02消费结果如下:

3、发布订阅模式(Publish/Subscribe)

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe),特点如下:

生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的 每个队列 中。发布订阅模式使用 fanout 交换机。工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。

3.1 生产者代码

package com.sea.rabbitmq.queue.publishqueue;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 发布订阅模式生产者

*

* @author sea

* @date 2023-12-01

*/

public class Producer {

//交换机名称

public static final String EXCHANGE_NAME = "fanout_exchange";

//队列名称,分别为:邮件队列、信息队列、站内队列

public static final String EMAIL_QUEUE_NAME = "send_email_queue";

private static final String MESSAGE_QUEUE_NAME = "send_message_queue";

private static final String STATION_QUEUE_NAME = "send_station_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

//创建fanout交换机

/**

* 参数1:交换机名

* 参数2:交换机类型

* 参数3:交换机是否持久化

*/

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);

//创建队列

channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null);

channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null);

channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null);

//交换机绑定队列

/**

* 参数1:队列名

* 参数2:交换机名

* 参数3:路由关键字,发布订阅模式写""即可

*/

channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "");

channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "");

channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "");

//发送消息

String message = "京东618活动马上就要开始啦! 欢迎您登录京东参与。";

channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());

//关闭资源

channel.close();

}

}

3.1 消费者代码

这里我们编写三个消费者,分别为短信消费者、邮件消费者、站内信消费者,三者的代码大体都一致,不同的是监听的对列名以及输出内容,所以此处只展示短信消费者的代码:

package com.sea.rabbitmq.queue.publishqueue;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 发布订阅模式:短信消费者

*

* @author sea

* @date 2023-12-01

*/

public class MessageConsumer {

private static final String MESSAGE_QUEUE_NAME = "send_message_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

channel.basicConsume(MESSAGE_QUEUE_NAME, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body);

System.out.println("发送短信成功,短信内容为: " + message);

}

});

}

}

注意:也可以使用 工作队列 + 发布订阅 模式同时使用,即两个消费者同时监听一个队列

4、路由模式(Routing)

使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如:电商网站的促销活动,618大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用 路由模式 (Routing) 完成这一需求。特点如下:

每个队列绑定路由关键字 RoutingKey生产者将带有 RoutingKey 的消息发送给交换机,交换机根据 RoutingKey 转发到指定队列。路由模式使用 direct 交换机。能按照路由键将消息发送给指定队列

3.1 生产者代码

由于此模式是在发布订阅模式基础上新增了路由绑定规则,进而实现将消息发送给指定队列的功能,所以代码与上面代码大致相同,不过添加了路由key,具体代码如下:

package com.sea.rabbitmq.queue.routingqueue;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 路由模式 生产者

*

* @author sea

* @date 2023-12-01

*/

public class Producer {

//交换机名称

private static final String EXCHANGE_NAME = "routing_exchange";

//队列名称,分别为: 邮箱队列、信息队列、站内信队列

private static final String EMAIL_QUEUE_NAME = "send_email_queue";

private static final String MESSAGE_QUEUE_NAME = "send_message_queue";

private static final String STATION_QUEUE_NAME = "send_station_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

//创建direct交换机

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

//创建队列

channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null);

channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null);

channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null);

//交换机绑定队列(重要的、普通的)

channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "important");

channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "important");

channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "important");

channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "common");

//发送消息(重要消息全部发送,否则只发送站内消息)

String importMessage = "京东618活动马上就要开始啦! 欢迎您登录京东参与";

String commonMessage = "京东满减优惠开始啦,诚邀您参与";

channel.basicPublish(EXCHANGE_NAME, "important", null, importMessage.getBytes());

channel.basicPublish(EXCHANGE_NAME, "common", null, commonMessage.getBytes());

//关闭资源

channel.close();

}

}

3.1 消费者代码

消费者可以直接使用上面发布订阅模式的三个消费者,故此处就不再展示。

下面为站内信消费者的消费结果,其他两个消费者的结果为一条,站内信消费者的结果为两条:

5、通配符模式(Topics)

通配符模式(Topics)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的 RoutingKey 能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,通配符模式使用 topic 交换机。 能按照通配符规则将消息发送给指定队列。

通配符规则如下:

队列设置 RoutingKey 时,# 可以匹配任意多个单词,* 可以匹配任意一个单词。消息设置 RoutingKey 时,RoutingKey 由多个单词构成,中间以 . 分割。

3.1 生产者代码

由于此模式是在路由模式基础上给队列绑定了带通配符的路由关键字,进而实现将消息按照通配符规则发送给指定队列,所以代码与上面代码大致相同,具体代码如下:

package com.sea.rabbitmq.queue.topicsqueue;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.sea.rabbitmq.queue.utils.RabbitMqUtils;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* 通配符模式 生产者

*

* @author sea

* @date 2023-12-01

*/

public class Producer {

//交换机名称

private static final String EXCHANGE_NAME = "topic_exchange";

//队列名称,分别为: 邮箱队列、信息队列、站内信队列

private static final String EMAIL_QUEUE_NAME = "send_email_queue";

private static final String MESSAGE_QUEUE_NAME = "send_message_queue";

private static final String STATION_QUEUE_NAME = "send_station_queue";

public static void main(String[] args) throws IOException, TimeoutException {

Channel channel = RabbitMqUtils.getChannel();

//创建direct交换机

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);

//创建队列

channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null);

channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null);

channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null);

//交换机绑定队列

channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "#.email.#");

channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "#.message.#");

channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "#.station.#");

//发送消息

String importMessage = "京东618活动马上就要开始啦! 欢迎您登录京东参与";

String commonMessage = "京东满减优惠开始啦,诚邀您参与";

//类似于模糊匹配

channel.basicPublish(EXCHANGE_NAME, "email.message.station", null, importMessage.getBytes());

channel.basicPublish(EXCHANGE_NAME, "station", null, commonMessage.getBytes());

//关闭资源

channel.close();

}

}

3.1 消费者代码 消费者可以直接使用上面发布订阅模式的三个消费者,故此处就不再展示。

下面为站内信消费者的消费结果,其他两个消费者的结果为一条,站内信消费者的结果为两条:

6、远程调用模式(RPC, 不常用)

Springboot 整合 RabbitMQ

1、添加依赖

org.springframework.boot

spring-boot-starter-amqp

org.projectlombok

lombok

2、配置文件

server:

port: 8081

spring:

rabbitmq:

host: 47.94.151.26

port: 5672

username: guest

password: guest

virtual-host: /

3、配置类

SpringBoot 整合 RabbitMQ 时,需要在配置类中创建队列和交换机。

package com.sea.rabbitmq.boot;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

/**

* RabbitMq配置类

*

* @author sea

* @date 2023-12-01

*/

@Configuration

public class RabbitMqConfig {

private static final String TOPIC_EXCHANGE_NAME = "topicExchange";

private static final String QUEUE_NAME = "bootQueue";

/**

* 创建topic类型交换机

*/

@Bean("topicExchange")

public Exchange getExchange() {

return ExchangeBuilder

//交换机类型

.topicExchange(TOPIC_EXCHANGE_NAME)

//是否持久化

.durable(true)

.build();

}

/**

* 创建队列

*/

@Bean("bootQueue")

public Queue getQueue() {

return QueueBuilder

//持久化队列

.durable(QUEUE_NAME)

.build();

}

/**

* 交换机与队列绑定

*/

@Bean

public Binding bingMessageQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {

return BindingBuilder

.bind(queue)

.to(exchange)

//路由规则

.with("#.message.#")

//没有其他参数

.noargs();

}

}

4、生产者 SpringBoot 整合 RabbitMQ 时,提供了工具类 RabbitTemplate 发送消息,编写生产者时只需要注入 RabbitTemplate 即可发送消息。

package com.sea.rabbitmq.boot;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

/**

* 消费者

*

* @author sea

* @date 2023-12-01

*/

@Component

public class Consumer {

/**

* 监听队列

*/

@RabbitListener(queues = "bootQueue")

public void listenMessage(String message) {

System.out.println("整合boot,接收消息:" + message);

}

}

5、消费者

此处需要编写另一个 SpringBoot 项目作为 RabbitMQ 的消费者,因为如果在一个项目中可以通过方法调用就行了,没有必要通过 RabbitMQ 来进行通信了

搭建 RabbitMQ 的 SpringBoot 项目步骤与前面相同,这里直接展示消费者代码:

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* SpringBoot 整合 RabbitMQ 生产者

*/

@Test

public void testSendMessage() {

/**

* 发送消息

* 1.exchange: 交换机

* 2.routingKey: 路由key

* 3.object: 发送的消息

*/

rabbitTemplate.convertAndSend("topicExchange", "message", "快来参加京东618活动哦,整合boot。");

}

@RabbitListener 注解用来监听队列,放在具体的方法上面。

消息的可靠性投递

RabbitMQ 消息的投递路径为:

​ 生产者 ------> 交换机 ------> 队列 ------> 消费者

在 RabbitMQ 工作的过程中,每个环节消息都有可能传递失败,RabbitMQ 可以通过以下三种模式来监听消息时候投递成功:

确认模式(Confirm):可以监听消息是否从生产者成功传递到交换机。退回模式(Return):可以监听消息是否从交换机成功传递到队列。消费者消息确认(Consumer Ack):可以监听消费者是否成功处理消息。

1、确认模式

1.1 生产者模块配置文件

spring:

rabbitmq:

# 开启确认模式

publisher-confirm-type: correlated

spring.rabbitmq.publisher-confirm-type 属性有三个值:

NONE:禁用发布确认模式,默认值CORRELATED:发布消息成功到交换器后会触发回调方法,一般使用此值SIMPLE:简单使用

CORRELATED 和 SIMPLE 的区别:

CORRELATED:发布者将收到一个带有确认标识 的确认消息,该标识与发布的每个消息相关联。这种模式可以确保每个消息被确认,但需要在发送和接收确认之间进行映射。SIMPLE:发布者将仅收到一条确认消息,表示所有消息都已经成功发布。这种模式相对简单,但不能保证每个消息都成功确认。

1.2 回调接口

package com.sea.rabbitmq.confirm;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

* 确定模式的回调方法,消息向交换机发送后调用confirm()方法

*

* @author sea

* @date 2023-12-01

*/

@Component

@Slf4j

public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void setConfirmCallback() {

//将当前实现类注入到rabbitTemplate的确认回调

rabbitTemplate.setConfirmCallback(this);

}

/**

* 交换机确认回调接口

*

* @param correlationData 存储消息的ID和自己存储的关于该条消息的信息

* @param ack 交换机是否接收成功

* @param cause 异常原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

String id = null != correlationData ? correlationData.getId() : "";

if (ack) {

log.info("交换机接收到id为:{}的消息", id);

} else {

log.info("交换机未接收到id为:{}的消息,原因为{}", id, cause);

//TODO 交换机未收到消息,可以进行对应的业务处理

}

}

}

1.3 控制类

package com.sea.rabbitmq.confirm;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

/**

* @author sea

* @date 2023-12-01

*/

@RestController

@Slf4j

@RequestMapping("/producer")

public class ProducerController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/confirm/{msg}")

public void sendMsgWithConfirm(@PathVariable String msg) {

rabbitTemplate.convertAndSend("topicExchange", "message", msg, new CorrelationData());

log.info("生产者发送消息:{}", msg);

}

}

1.4 使用 Postman 进行测试

正常发送,控制台输出如下: 发送失败的情况,将交换机的名称改为不存在的,控制台输出如下:

2、回退模式

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,此时通过开始消息回退将消息传递过程中不可达目的地时将消息返回给生产者。

退回模式可以监听消息是否从交换机成功传递到队列,具体使用如下:

2.1 添加配置

spring:

rabbitmq:

# 开启回退模式

publisher-returns: true

2.2 回调接口

package com.sea.rabbitmq.confirm;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

* 回退模式的回调方法,交换机发送到队列失败后才会执行 returnedMessage()方法

*

* @author sea

* @date 2023-12-01

*/

@Component

@Slf4j

public class MyReturnCallback implements RabbitTemplate.ReturnsCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void setReturnsCallback() {

rabbitTemplate.setReturnsCallback(this);

}

/**

* 消息回退接口,只有当消息无法传递到目的地时才进行回退

*

* @param returned 失败后将失败信息封装到参数中

*/

@Override

public void returnedMessage(ReturnedMessage returned) {

log.error("消息 {}, 被交换机 {} 退回, 应答代码 {}, 原因 {}, 路由 {}",

new String(returned.getMessage().getBody()),

returned.getExchange(),

returned.getReplyCode(),

returned.getReplyText(),

returned.getRoutingKey());

//TODO 消息回退,可以进行对应的业务处理

}

}

1.4 使用 Postman 进行测试

发送成功不会打印对应日志,只有当消息发送失败时才进行回退,才会打印对应日志。

发送失败的情况,将路由的名称改为不存在的,控制台输出如下:

3、消费者消费确认模式

在 RabbitMQ 中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。

这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。消息分为 自动确认 和 手动确认。

自动确认,指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

自动确认:spring.rabbitmq.listener.simple.acknowledge-mode = none 手动确认:spring.rabbitmq.listener.simple.acknowledge-mode = manual

注意:此模式的配置要配置在消费端

3.1 消费者添加配置

spring:

rabbitmq:

listener:

simple:

# 开启手动确认模式

acknowledge-mode: manual

3.2 手动确认模式的消费者

package com.sea.rabbitmq.confirm;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**

* 手动确认模式的消费者

*

* @author sea

* @date 2023-12-01

*/

@Component

public class AckConsumer {

@RabbitListener(queues = "bootQueue")

public void listenMsg(Message message, Channel channel) throws IOException {

//消息投递号,每次投递消息该值都会加1

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

// int i = 1/0;

System.out.println("成功接收到消息:" + message);

/**

* 手动签收消息

* deliveryTag: 消息投递号

* true: 是否可以一次签收多条消息

*/

channel.basicAck(deliveryTag, true);

} catch (Exception e) {

System.out.println("消息接收失败!");

/**

* 拒签消息,设置消息重回队列中

* deliveryTag: 消息投递号

* true: 是否可以一次拒签多条消息

* true: 拒签后消息是否重回队列,不放回消息则会放到死信队列

*/

channel.basicNack(deliveryTag, true, true);

}

}

}

注意:上面代码由于模拟运行时异常,拒签后重新放回队列中,然后重新执行,所以会一直输出"消息接收失败!"

其他高级特性

1、消费端限流

假想我们 RabbtiMQ 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,巨量的消息瞬间全部喷涌推动过来,但是单个客户端无法同时处理这么多条数据,就会被压垮崩溃。

RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量)服务质量保证功能。即在非自动确认消息的前提下,如果一定数目的消息未被确认之前,不再进行消费新的消息。

我们可以通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

拓展:预取值(Prefetch) 在 RabbitMQ 中,预取值(Prefetch Count)是指消费者从队列中预取的消息数量。当一个消费者连接到一个队列并开始消费消息时,它可以通过设置预取值来控制一次从队列中获取的消息数量。预取值可以在消费者创建时进行设置,也可以在运行时进行更改。

预取值的主要作用:是控制消费者的负载,避免一个消费者在处理消息时占用过多的资源,导致其他消费者无法获得足够的资源。通过限制每次预取的消息数量,可以控制消费者的处理速度,避免过度消费队列中的消息。

预取值的设置方式有两种:

全局设置:通过 channel.basicQos(prefetchCount) 方法设置全局预取值。在这种情况下,所有的消费者都将使用相同的预取值。单独设置:通过 channel.basicConsume(queue, consumer) 方法的 prefetchCount 参数设置单独的预取值。在这种情况下,每个消费者都可以使用不同的预取值。

需要注意的是,预取值并不是绝对的,它只是一个提示值。当消费者处理完预取值数量的消息后,它可以继续从队列中获取更多的消息,不受预取值的限制。同时,当队列中的消息数量少于预取值时,消费者将无法获取更多的消息,直到队列中有新的消息可用。因此,预取值的设置应该根据实际情况进行调整,以保证消费者的负载均衡和队列的稳定性

1.1 消费端配置

spring:

rabbitmq:

listener:

simple:

# 开启手动确认模式(消费端限流必须开启)

acknowledge-mode: manual

# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息

prefetch: 5

1.2 生产者发送多条消息

/**

* 消费端限流、不公平分发 生产者

*/

@Test

public void testSendBatchMsg() {

for (int i = 0; i < 10; i++) {

rabbitTemplate.convertAndSend("topicExchange", "message", i + " 快来参加京东618活动哦,消费端限流。");

}

}

1.3 消费者监听队列

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**

* 消费端限流 消费者

*

* @author sea

* @date 2023-12-01

*/

@Component

@Slf4j

public class LimitConsumer {

@RabbitListener(queues = "bootQueue")

public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {

// 1.获取消息

String msg = new String(message.getBody());

log.info("接收到的消息为: {}", msg);

// 2.模拟业务处理

Thread.sleep(3000);

// 3.手动签收消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);

}

}

1.4 消费过程中管控台如下

1.5 消费结果如下

2、不公平分发

在 RabbitMQ 中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1 处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1 有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

2.1 消费端配置

spring:

rabbitmq:

listener:

simple:

# 开启手动确认模式(消费端限流必须开启)

acknowledge-mode: manual

# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息

# prefetch: 5

# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发

prefetch: 1

2.2 生产者发送多条消息

与上面一致即可

2.3 添加两个效率不一样的消费者

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**

* 不公平分发 消费者

*

* @author sea

* @date 2023-12-01

*/

@Component

@Slf4j

public class UnfairConsumer {

@RabbitListener(queues = "bootQueue")

public void listenMsgOne(Message message, Channel channel) throws InterruptedException, IOException {

// 1.获取消息

String msg = new String(message.getBody());

log.info("消费者 1 接收到的消息为: {}", msg);

// 2.模拟业务处理, 快

Thread.sleep(500);

// 3.手动签收消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);

}

@RabbitListener(queues = "bootQueue")

public void listenMsgTwo(Message message, Channel channel) throws InterruptedException, IOException {

// 1.获取消息

String msg = new String(message.getBody());

log.info("消费者 2 接收到的消息为: {}", msg);

// 2.模拟业务处理, 慢

Thread.sleep(3000);

// 3.手动签收消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);

}

}

2.4 消费端实现了不公平分发效果

3、消息存活时间

RabbitMQ 可以设置消息的存活时间(Time To Live,简称TTL),单位是毫秒,当消息到达存活时间后还没有被消费,会被移出队列。

RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。具体使用如下:

消息到达存活时间未被消费时,消息会被放入死信队列。

3.1 对队列的所有消息设置存活时间

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

/**

* 创建有存活时间的消息队列

*/

@Bean("ttlQueue")

public Queue getTtlQueue() {

return QueueBuilder

.durable("ttlQueue")

//10s消息过期

.ttl(10000)

.build();

}

/**

* 绑定带有存活时间的队列

*/

@Bean

public Binding bingTtlQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("ttlQueue") Queue queue) {

return BindingBuilder

.bind(queue)

.to(exchange)

.with("ttl")

.noargs();

}

3.2 生产者代码

/**

* 消息存活时间 生产者

*/

@Test

public void testSendMsgWithTtl() {

//设置消息属性

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("10000");

//创建消息对象

Message message = new Message("测试发送带有过期时间的消息。".getBytes(StandardCharsets.UTF_8), messageProperties);

//发送消息

rabbitTemplate.convertAndSend("topicExchange", "ttl", message);

}

3.3 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

* 消息存活时间 消费者

*

* @author sea

* @date 2023-12-01

*/

@Component

@Slf4j

public class WithTtlConsumer {

@RabbitListener(queues = "ttlQueue")

public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {

// 1.获取消息

String msg = new String(message.getBody());

log.info("接收到的消息为: {}", msg);

// 2.模拟业务处理

Thread.sleep(2000);

// 3.手动签收消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);

}

}

3.4 测试结果 注意:

如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

4、优先级队列

RabbitMQ 优先级队列(Priority Queue)是一种特殊的队列,它根据消息的优先级将其放置在队列中。当消费者从队列中获取消息时,它将按照优先级从高到低的顺序获取消息。优先级队列可以用于处理一些需要按照优先级处理的消息,例如日志记录、任务调度等。具体使用如下:

4.1 对队列的所有消息设置存活时间

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

/**

* 创建有优先级的消息队列

*/

@Bean("priorityQueue")

public Queue getPriorityQueue() {

return QueueBuilder

.durable("priorityQueue")

//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源

.maxPriority(10)

.build();

}

/**

* 绑定带有优先级的队列

*/

@Bean

public Binding bingPriorityQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("priorityQueue") Queue queue) {

return BindingBuilder

.bind(queue)

.to(exchange)

.with("priority")

.noargs();

}

4.2 生产者代码

/**

* 优先级队列 生产者

*/

@Test

public void testPriority() {

for (int i = 0; i < 50; i++) {

//i为5的倍数时优先级较高

if (i % 5 == 0) {

MessageProperties messageProperties = new MessageProperties();

//设置优先级

messageProperties.setPriority(9);

//创建消息对象

Message message = new Message((i + " 测试发送带有优先级的消息。").getBytes(StandardCharsets.UTF_8), messageProperties);

//发送消息

rabbitTemplate.convertAndSend("topicExchange", "priority", message);

} else {

rabbitTemplate.convertAndSend("topicExchange", "priority", i + " 普通级别的消息");

}

}

}

4.2 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

/**

* 优先级队列 消费者

*

* @author sea

* @date 2023-12-01

*/

@Component

@Slf4j

public class PriorityConsumer {

@RabbitListener(queues = "priorityQueue")

public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {

// 1.获取消息

String msg = new String(message.getBody());

log.info("接收到的消息为: {}", msg);

Thread.sleep(2000);

// 2.手动签收消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE);

}

}

4.4 测试结果

死信队列

在 MQ 中,当消息在队列中由于某些原因没有被及时消费而变成死信(Dead Message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在 RabbitMQ 中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。

死信交换机 和 死信队列与普通的没有区别。死信队列只是一种特殊的队列,里面的消息仍然可以消费。

消息成为死信的情况:

队列消息长度到达限制消费者拒签消息,并且不把消息重新放入原队列消息到达存活时间未被消费

实现流程如下以及具体代码如下: 1. 创建普通队列和死信队列配置

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

public static final String DEAD_EXCHANGE_NAME = "deadExchange";

public static finalString DEAD_QUEUE_NAME = "deadQueue";

public static final String NORMAL_EXCHANGE_NAME = "normalExchange";

public static final String NORMAL_QUEUE_NAME = "normalQueue";

/**

* 创建死信交换机,与普通交换机一样

*/

@Bean("deadExchange")

public Exchange getDeadExchange() {

return ExchangeBuilder

.topicExchange(DEAD_EXCHANGE_NAME)

.durable(true)

.build();

}

/**

* 创建死信队列,与普通队列一样

*/

@Bean("deadQueue")

public Queue getDeadQueue() {

return QueueBuilder

.durable(DEAD_QUEUE_NAME)

.build();

}

/**

* 死信交换机绑定死信队列

*/

@Bean

public Binding bingDeadQueue(@Qualifier("deadExchange") Exchange exchange, @Qualifier("deadQueue") Queue queue) {

return BindingBuilder

.bind(queue)

.to(exchange)

.with("dead_route")

.noargs();

}

/**

* 创建普通交换机

*/

@Bean("normalExchange")

public Exchange getNormalExchange() {

return ExchangeBuilder

.topicExchange(NORMAL_EXCHANGE_NAME)

.durable(true)

.build();

}

/**

* 创建普通队列

*/

@Bean("normalQueue")

public Queue getNormalQueue() {

return QueueBuilder

.durable(NORMAL_QUEUE_NAME)

// 绑定死信交换机

.deadLetterExchange(DEAD_EXCHANGE_NAME)

// 死信队列路由关键字

.deadLetterRoutingKey("dead_route")

// 消息存活10s(此队列消息过期会变成死信)

.ttl(10000)

// 队列最大长度为10(此队列的长度大于10会变成死信)

.maxLength(10)

.build();

}

/**

* 普通交换机绑定普通队列

*/

@Bean

public Binding bingNormalQueue(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {

return BindingBuilder

.bind(queue)

.to(exchange)

.with("normal_route")

.noargs();

}

2. 生产者代码

/**

* 测试死信队列

*/

@Test

public void testDLX(){

// 1.存活时间过期后变成死信

rabbitTemplate.convertAndSend("normalExchange","normal_route","测试消息过期,消息会成为死信");

// 2.超过队列长度后变成死信

for (int i = 0; i < 15; i++) {

rabbitTemplate.convertAndSend("normalExchange","normal_route","测试超过队列长度,消息会成为死信");

}

// 3.消息拒签但不返回原队列后变成死信

rabbitTemplate.convertAndSend("normalExchange","normal_route","测试消费者拒签并且不放回队列,消息会成为死信");

}

3. 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

import java.util.Date;

/**

* 死信队列 消费者

*

* @author sea

* @date 2023-12-04

*/

@Component

@Slf4j

public class DeadConsumer {

@RabbitListener(queues ="normalQueue")

public void listenMessage(Message message, Channel channel) throws IOException {

// 拒签消息

channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);

}

@RabbitListener(queues = "deadQueue")

public void receiveD(Message message, Channel channel){

String msg = new String(message.getBody());

log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg);

}

}

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

但 RabbitMQ 中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

延迟队列:TTL + 死信队列的合体

1. 创建延迟配置

在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中)

public static final String DELAYED_EXCHANGE = "delayed_exchange";

public static final String DELAYED_QUEUE = "delayed_queue";

public static final String DELAYED_ROUTE_KEY = "delayed_route";

/**

* 创建延迟交换机

*/

@Bean("delayed_exchange")

public Exchange getDelayedExchange() {

// 创建自定义交换机

Map args = new HashMap<>(1);

// topic类型的延迟交换机

args.put("x-delayed-type", "topic");

/**

* 参数1: 交换机名称

* 参数2: 交换机类型(x-delayed-message代表延迟交换机)

* 参数3: 是否持久化

* 参数4: 是否自动删除

* 参数5: 额外参数

*/

return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);

}

/**

* 创建延迟队列

*/

@Bean("delayed_queue")

public Queue getDelayedQueue() {

return QueueBuilder

.durable(DELAYED_QUEUE)

.build();

}

/**

* 绑定延迟队列

*/

@Bean

public Binding bindingDelayedQueue(@Qualifier("delayed_queue") Queue queue, @Qualifier("delayed_exchange") Exchange exchange) {

return BindingBuilder

.bind(queue)

.to(exchange)

.with(DELAYED_ROUTE_KEY)

.noargs();

}

2. 生产者代码

package com.sea.rabbitmq.advanced;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**

* 延迟队列 生产者

*

* @author sea

* @date 2023-12-04

*/

@RestController

@Slf4j

public class DelayedController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("sendDelayed")

public void sendDelayed(String message, Integer delayedTime) {

log.info("当前时间:{},发送一条过期时间{}信息给delayed交换机:{}", new Date(), delayedTime, message);

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setDelay(delayedTime);

return message;

}

};

rabbitTemplate.convertAndSend("delayed_exchange", "delayed_route", message, messagePostProcessor);

//另一种写法

// rabbitTemplate.convertAndSend("delayed_exchange", "delayed_route", message,

// msg -> {

// //设置延迟时间

// msg.getMessageProperties().setDelay(delayedTime);

// return msg;

// });

}

}

3. 消费者代码

package com.sea.rabbitmq.advanced;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.util.Date;

/**

* 延迟队列 消费者

*

* @author sea

* @date 2023-12-04

*/

@Component

@Slf4j

public class DelayedConsumer {

@RabbitListener(queues = "delayed_queue")

public void receive(Message message, Channel channel) {

String msg = new String(message.getBody());

log.info("当前时间{},收到延时交换机的消息:{}", new Date(), msg);

}

}

如果有收获! 希望老铁们来个三连,点赞、收藏、转发。

创作不易,别忘点个赞,可以让更多的人看到这篇文章,顺便鼓励我写出更好的博客

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: