注释上有个错误
生产者
/* 1.队列名称 2.队列里面的消息是否持久化(磁盘),默认情况消息存储在内存中。false表示不持久化,true表示持久化。 3.该队列是否排他,即是否只有声明它的连接可用,并在连接关闭后自动删除。true表示是排他的。 4.是否自动删除,队列不再使用时是否自动删除队列,而不是删除消息。true表示自动删除队列。 5.其他参数,可以是null或者包含更多结构化配置的Map,如关于延迟消息和死信消息的设置。 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//1.交换机可以""表示默认 2.路由键(Routing Key)队列名称 3.是否持久化消息 4.二进制消息体
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
简介
MQ(Message queue),本质是一个队列,FIFO先进先出,只不过队列中存放的是message.
是一种跨进程的通信机制,用于上下游传递消息.
为什么时候MQ
1.流量削峰.例如一个应用最大TPS为1W,但是高峰时期需要承受2W的TPS,这个时候可以使用消息队列做缓冲,分散时间来处理.总比不能处理强
2.应用解耦.当A应用与B应用直接连接.B应用如果需要维护,这A应用也需要停止,当中间使用MQ的话如果B应用需要修复,MQ会将B应用要处理的内容缓存在消息队列中,等待B应用修复
3.异步处理.例如A应用调用B应用,但是B应用需要很长时间处理才能完成,当B应用处理完成可以通过MQ转告A应用.
MQ的分类:
1.Kafka:用于大数据领域的消息传输.百万计TPS吞吐量,在数据采集,传输,存储的过程中发挥重要作用.优点性能卓越
2.RocketMQ.出自阿里巴巴,用Java实现,设计时参考Kafka,被阿里广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理等场景.屯出量十万级.分布式架构,消息0丢失,缺点,目前是支持java及C++,且C++不成熟,没有在MQ核心中实现JMS接口
3.RabbitMQ.是一个AMQP(高级消息队列协议)基础上完成,可复用的企业消息系统,主流MQ消息中间件之一,erlang语言开发,吞吐量万级,更新频繁
RabbitMQ的主要作用用于消息接收--->消息存储---->消息转发
四大核心概念
1.生产者:产生数据发送消息的应用是生产者
2.交换机:交换机是RabbitMQ一个重要部件,一方面接收来自生产者的消息,另一方面它将消息推送到队列中.交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或是把消息丢弃,这个得有交换机类型决定
3.队列.队列是RabbitMQ内部使用得一种数据结构,消息流经RabbitMQ和应用程序,只能存储在队列中,队列仅受主机得内存和磁盘限制得约束,本质是一个大得消息缓冲区.生产者可以将消息发送到一个队列,消费者可以从队列接收数据.
4.消费者.接收消息的一方.消费者大多是一个等待接收消息的应用.
注意:消费者和生产者和消息中间件很多时候并不在同一台服务器中.同一个应用程序既可以是生产者又可以是消费者.
RabbitMQ核心学习的六个部分(模式)
1.Hello World:简单模式
2.Work queue:工作模式
3.Publish/Subscribe:发布订阅模式
4.Routing:路由模式
5.Topics:主题模式
6.Publisher Confirms:发布确认模式
RabbitMQ安装
www.rabbitmq.com
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
用这个命令 需要用 ctrl+p+q
可以后台运行 建议后台 不然关闭后找不到容器
docker run -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
如果你不用docker需要先安装erlang环境
连接:IP:15672 登录账密 guest guest
老版本会有不能登录的情况
进入rabbitmq
docker exec -it rabbitmq /bin/bash
如果是老版本可以这么操作
创建账户
rabbitmqctl add_user admin 123456
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限 所有资源的配置,读写权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
查看当前所有用户
rabbitmqctl list_users
Hello Wordl简单模式
生产者将消息发送给MQ
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ的队列
factory.setHost("XX.XX.XXX.X");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
//会有默认交换机(Exchange) 简单模式不搞太复杂
/*
生成一个队列
参数:
1.队列名称
2.队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中 false不持久化 true持久化
3.该队列是否只供一个消费者进行,是否进行消费共享,true表示可以多个消费者消费
4.是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除消息,true自动删除消息
5.其他参数,关于延迟消息,死信消息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发消息
String message="Hello World";
/*
信到发送消息 简单模式 暂不考虑交换机
1.发送到哪个交换机
2.表示路由的Key 本次是队列名
3.其他参数信息
4.发送消息的消息体(二进制)
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("消息发送完毕");
}
}
执行
消费者接收消息
public class Consumer {
//队列名称
private final static String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxx.xx.xx.x");
factory.setUsername("admin");
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
// 消费者成功接收到消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到的消息: " + message);
};
//4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
CancelCallback cancelCallback=consumerTag->{
System.out.println("消息接收(消费)被中断......");
};
/**
* 消费者消费消息 有自动应答 手动应答的方法
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true表示自动应答 false手动应答
* 3.消费者成功接收到消息的回调
* 4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
*/
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
Work Queues工作模式
工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成.相反我们安排任务在之后执行.我们把任务封装为消息并将其发送到队列.在后台运行的工作进程将弹出任务并最终执行作业.当有多个工作线程时,这些工作线程将一起处理这些任务。
1.轮训分发消息
上面生产者和消费者中用到的代码可以抽取一个连接工厂工具类
public class RabiitMQUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
生产者
public class Task {
//队列名称
private static final String QUEUE_NAME="hello";
//发送大量消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//会有默认交换机(Exchange) 简单模式不搞太复杂
/*
生成一个队列
参数:
1.队列名称
2.队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中 false不持久化 true持久化
3.该队列是否只供一个消费者进行,是否进行消费共享,true表示可以多个消费者消费
4.是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除消息,true自动删除消息
5.其他参数,关于延迟消息,死信消息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发消息 本次用控制台输入
Scanner scanner=new Scanner(System.in);
String message="";
while(scanner.hasNext()){
message=scanner.nextLine();
/*
信到发送消息 简单模式 暂不考虑交换机
1.发送到哪个交换机
2.表示路由的Key 本次是队列名
3.其他参数信息
4.发送消息的消息体(二进制)
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("消息发送完毕");
}
}
}
消费者1
public class Work01 {
//队列名称
private static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMQUtils.getChannel();
// 消费者成功接收到消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Work1接收到的消息: " + message);
};
//4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
CancelCallback cancelCallback= consumerTag->{
System.out.println("消息接收(消费)被中断......");
};
/**
* 消费者消费消息 有自动应答 手动应答的方法
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true表示自动应答 false手动应答
* 3.消费者成功接收到消息的回调
* 4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
*/
System.out.println("Work1等待接收消息");
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
消费者2
public class Work02 {
//队列名称
private static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMQUtils.getChannel();
// 消费者成功接收到消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Work2接收到的消息: " + message);
};
//4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
CancelCallback cancelCallback= consumerTag->{
System.out.println("消息接收(消费)被中断......");
};
/**
* 消费者消费消息 有自动应答 手动应答的方法
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true表示自动应答 false手动应答
* 3.消费者成功接收到消息的回调
* 4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
*/
System.out.println("Work2等待接收消息");
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
2.消息应答
概念:
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然挂掉了.RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除.在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息.以及后续发送给该消费者的消息,因为它可能无法接收到.因此为了保证消息在发送过程中不丢失.RabbitMQ引入了消息应答机制,就是说:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了.
也就是说让RabbitMQ等待消费者把消息处理完成之后再删除,消费者回调告诉MQ我处理完了.
消息应答又分为两种
1.自动应答
消息发送之后被立即认为已经传送成功,实际处理如何?成功了吗?不知道, 只是说MQ已经把消息给你了.一般来说这种在高吞吐量情况下使用.要和数据传输安全性权衡.如果用这种应答方式,消息可能丢失.这种模式仅适用在消费者可以高效处理消息情况下适用.
消费者接到消息就和MQ说我完成了,实际上最后有没有完成不知道.
建议:尽量不使用自动应答,除非真的无关紧要
2.手动应答
手动应答的方式
缺少的这个参数Mutiple是批量处理参数
信道里可能有很多消息,平时建议设置为false,处理一条,回调一条
设置为true,可能造成消息丢失(原因是你可能只处理了一条消息,但是应答的是信道中所有消息)
3.消息自动重新入队
如果消费者由于某些原因失去连接(通道关系,连接关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队,如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者,这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
消息手动应答代码相关
消息手动应答主要在消费者这边,代码的修改也在消费者这边
下面来模拟手动应答情况下:消费者1在某种情况下关闭了线程,消息自动重新入队,交由消费者2完成(处理)的情况,保证消息不丢失 证明消息在手动应答时候不丢失
生产者
public class Task2 {//消息在手动应答时是不丢失的,如果出现问题,放回队列重新消费
//队列名称
private static final String QUEUE_NAME="ack_queue";
//发送大量消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//会有默认交换机(Exchange) 简单模式不搞太复杂
/*
生成一个队列
参数:
1.队列名称
2.队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中 false不持久化 true持久化
3.该队列是否只供一个消费者进行,是否进行消费共享,true表示可以多个消费者消费
4.是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除消息,true自动删除消息
5.其他参数,关于延迟消息,死信消息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发消息 本次用控制台输入
Scanner scanner=new Scanner(System.in);
String message="";
while(scanner.hasNext()){
message=scanner.nextLine();
/*
信到发送消息 简单模式 暂不考虑交换机
1.发送到哪个交换机
2.表示路由的Key 本次是队列名
3.其他参数信息
4.发送消息的消息体(二进制)
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("消息发送完毕");
}
}
}
消费者1 和 消费者2 里面睡一会 分别代表处理消息的快慢
public class Work3 {//消息在手动应答时是不丢失的,出现问题会放回队列中重新消费
//队列名称
private static final String QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMQUtils.getChannel();
// 消费者成功接收到消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//表示接收消息的过程
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Work3接收到的消息: " + message+",接下来手动应答");
//手动应答
/*
1.消息的标记
2.是否批量应答(一般处理一个应答一个,不进行批量应答信道消息)
false:不批量应答信道中的消息 true:批量应答信道中所有消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("Work3已应答MQ处理完毕");
};
//4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
CancelCallback cancelCallback= consumerTag->{
System.out.println("Work3消费者取消消费,接口回调");
System.out.println("Work3消息接收(消费)被中断......");
};
/**
* 消费者消费消息 有自动应答 手动应答的方法
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true表示自动应答 false手动应答
* 3.消费者成功接收到消息的回调
* 4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
*/
System.out.println("Work3等待接收消息");
//采用手动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck,deliverCallback,cancelCallback);
}
}
public class Work4 {//消息在手动应答时是不丢失的,出现问题会放回队列中重新消费
//队列名称
private static final String QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMQUtils.getChannel();
// 消费者成功接收到消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//表示接收消息的过程
try {
Thread.sleep(22000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Work4接收到的消息: " + message+",接下来手动应答");
//手动应答
/*
1.消息的标记
2.是否批量应答(一般处理一个应答一个,不进行批量应答信道消息)
false:不批量应答信道中的消息 true:批量应答信道中所有消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("Work4已应答MQ处理完毕");
};
//4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
CancelCallback cancelCallback= consumerTag->{
System.out.println("Work4消费者取消消费,接口回调");
System.out.println("Work4消息接收(消费)被中断......");
};
/**
* 消费者消费消息 有自动应答 手动应答的方法
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true表示自动应答 false手动应答
* 3.消费者成功接收到消息的回调
* 4.消费者取消消费的回调 就是被中断,如果正常接收 该回调无意义
*/
System.out.println("Work4等待接收消息");
//采用手动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck,deliverCallback,cancelCallback);
}
}
先启动生产者
启动之后,从RabbitMQ的管理界面可以看到有这么一个队列
下面启动两个消费者
可以自己测试 因为是轮询的,第一个给A 第二个肯定给B 当B处理时间过长,挂掉了(比如我们结束该main方法,消息会给A)
RabbitMQ消息持久化
默认情况下RabbitMQ退出或由于某种原因奔溃时,会忽视队列和消息.除非告诉它不要这样做
确保消息不丢失需要做两件事:将队列和消息都标记为持久化(两个都是在生产者修改)
1.队列持久化
此时如果在上面的代码上修改会报错,意思是:原先声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列
可以在管理界面删除原先队列
这样原先队列删除了
再次运行
管理端可以看到该队列持久化标记
2.消息持久化
注意:将消息标记为持久化并不能完全保证消息不丢失,尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在一种情况:当消息刚准备存储到磁盘,但是还没有存储完毕,挂了.但是对于我们的简单任务队列而言.已经足够了.如果需要更加强有力的持久化策略.往下看(发布确认)
设置生产者发送消息为持久化消息(要求保存到磁盘上)
3.不公平分发(建议使用,能者多劳.嗤之以鼻,不屑)
默认情况下RabbitMQ分发消息采用的轮询分发,但是在某些场景中,这种策略并不是很好,例如两个消费者A和B,A的处理速度非常快,而B的处理速度相对较慢.这样情况我们可以设置参数进行修改
不公平分发需要修改消费者代码.
注意:多个消费者都要加上这个代码
4.预取值
两个消费者都设置成1 按完成快慢 但是也并非固定
就是两个消费者A和B,指定A分多少 B分多少 单词叫prefetch
这样的话7条消息就会给消费者C1 两条 给消费者C2 五条
发布确认
1.发布确认原理
简单讲:生成者发消息给MQ(1.设置要求队列持久化,2.设置要求队列中的消息持久化),MQ将队列和消息持久化之后,回来告诉生产者,我保存了.这样保证生产者把消息给MQ的过程中消息不丢失.这就是发布确认 注意注意 和是否持久化无关 这里只是说持久化的时候 不持久化,发布确认的意思也是要回来告诉生产者
默认情况下是不开启的
2.发布确认策略
就是说我发布确认,到底是发一条你给我确认一条还是说发一批你给我确认一批还是说异步确认
开启发布确认的方法
1.单个确认发布
这是一种简单的确认方式,是同步确认发布的方式,就是说一个消息被确认发布后,后续的消息才能陆续发布,如果指定时间范围内这个消息没有被确认将抛出异常
缺点:由于同步关系,且要一一回复,导致速度变慢.如果一个消息没有确认发布会阻塞后续的消息发布.这种方式最多提供每秒不超过数百条发布消息的吞吐量,当然对于某些应用来说,已经足够了
2.批量确认发布
与单个等待确认相比,批量发送消息然后一起确认可以极大的提高吞吐量.缺点是当发生故障导致发布出现问题时,不知道是哪个消息出现问题.我们必须整个批处理保存在内存中,以记录重要信息后重新发布消息.批量确认发布也是同步的,也一样有阻塞问题
3.异步批量确认发布
异步确认虽然在编程逻辑上比上面两个要复杂,但是性价比最高,无论是可靠性还是是效率,它是利用回调函数来达到消息可靠性传递的,通过函数回调来保证是否投递成功
简单讲:不像上面两种,发一个或者发一批,通知你,成功了没,一应一答得方式.而异步批量确认发布是你把东西都给我尽管给我,我回头告诉你哪些收到了,哪些没收到
package confirm1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;
import java.util.UUID;
/**
* @author hrui
* @date 2024/2/28 18:07
*/
public class ConfirmMessage {//验证发布确认模式
//批量发消息的个数
public static final int MESSAGE_COUNT=1000;
public static void main(String[] args) throws Exception {
/**
* 发布确认
* 通过使用的时间,比较确认哪种方式是最好的
* 1.单个确认
* 2.批量确认
* 3.异步批量确认
*/
//ConfirmMessage.publishMessageIndividually();//发布1000条单独确认,耗时:14838毫秒
//ConfirmMessage.publishMessageBath();//发布1000条批量确认,耗时:231毫秒
ConfirmMessage.publishMessageAsync();//发布1000条异步批量确认,耗时:34毫秒
}
//单个确认
public static void publishMessageIndividually() throws Exception {
//获取信道
Channel channel = RabbitMQUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
//发布确认和是否持久化无关
//1.队列名称 2.队列里面的消息是否持久化(磁盘) 3.该队列是否排他 4.是否自动删除 5.其他参数,可以是null或者包含更多结构化配置的Map,如关于延迟消息和死信消息的设置。
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发送消息 单个发布确认
for (int i=0;i String message = i + ""; //1.交换机可以""表示默认 2.路由键(Routing Key)队列名称 3.是否持久化消息 4.二进制消息体 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); //单个消息进行发布确认 boolean b = channel.waitForConfirms(); if(b){ System.out.println("消息发布成功"); } } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"条单独确认,耗时:"+(end-begin)+"毫秒"); } //批量确认 public static void publishMessageBath() throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); //发布确认和是否持久化无关 //1.队列名称 2.队列里面的消息是否持久化(磁盘) 3.该队列是否排他 4.是否自动删除 5.其他参数,可以是null或者包含更多结构化配置的Map,如关于延迟消息和死信消息的设置。 channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量确认消息大小 100条确认一次 int batchSize=100; //批量发送消息 批量发布确认 for (int i=1;i<=MESSAGE_COUNT;i++) { String message = i + ""; //1.交换机可以""表示默认 2.路由键(Routing Key)队列名称 3.是否持久化消息 4.二进制消息体 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); //每100条确认 批量确定一次 if(i%100==0){ //看样子 发布确认是生产者要求MQ返回,这个返回耗时 boolean b = channel.waitForConfirms(); System.out.println("第"+i+"条消息发布确认"); } } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"条批量确认,耗时:"+(end-begin)+"毫秒"); } //异步批量发布确认 public static void publishMessageAsync() throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); //发布确认和是否持久化无关 //1.队列名称 2.队列里面的消息是否持久化(磁盘) 3.该队列是否排他 4.是否自动删除 5.其他参数,可以是null或者包含更多结构化配置的Map,如关于延迟消息和死信消息的设置。 channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //消息发布确认成功回调函数 ConfirmCallback ackCallback=(x,y)->{ System.out.println("确认的消息编号:"+x); }; //消息发布确认失败回调函数 //第一个参数为消息的标记 第二个为是否都批量确认 ConfirmCallback nackCallback=(x,y)->{ System.out.println("未确认的消息编号:"+x); }; //用异步确认:就不需要手动得去等待,而是MQ会回调告诉我们哪些成功哪些不成功,我们需要一个监听器 //准备消息得监听器 监听哪些消息成功了,哪些失败了 要将监听器放在发送的前面addConfirmListener有两个重载的方法 //一个只有一个参数,是只监听成功的 另外一个是既可以监听成功的也可以监听失败的 channel.addConfirmListener(ackCallback,nackCallback);//异步的 //开始时间 long begin = System.currentTimeMillis(); for (int i=1;i<=MESSAGE_COUNT;i++) { String message = "消息"+i; //1.交换机可以""表示默认 2.路由键(Routing Key)队列名称 3.是否持久化消息 4.二进制消息体 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"条异步批量确认,耗时:"+(end-begin)+"毫秒"); } } 4.如何处理异步未确认消息 最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列中,比如说用ConcurrentLinkedQueue(并发链路队列)这个队列在confirm(指发消息的线程) callbacks(回调的线程)与发布线程之间进行消息的传递 使用步骤:1.记录所有发送的消息 2.在成功回调中将成功的移除 3.剩下的就是未确认的消息 //异步批量发布确认 public static void publishMessageAsync() throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); //发布确认和是否持久化无关 //1.队列名称 2.队列里面的消息是否持久化(磁盘) 3.该队列是否排他 4.是否自动删除 5.其他参数,可以是null或者包含更多结构化配置的Map,如关于延迟消息和死信消息的设置。 channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); /** * 线程安全有序的一个哈希表 其实就是个Map 适用于高并发的情况 * 功能:轻松将序号与消息进行关联 轻松批量删除条目(只要给到序号) 支持高并发(多线程) */ ConcurrentSkipListMap //消息发布确认成功回调函数 ConfirmCallback ackCallback=(x,y)->{ if(y){//如果是批量 //2.确认发布时,将容器内数据删除,剩下就是未确认发布的消息 ConcurrentNavigableMap longStringConcurrentNavigableMap.clear(); }else{ outstaendingConfirms.remove(x); } System.out.println("确认的消息编号:"+x); }; //消息发布确认失败回调函数 //第一个参数为消息的标记 第二个为是否都批量确认 ConfirmCallback nackCallback=(x,y)->{ //3.打印未确认的消息 String s = outstaendingConfirms.get(x); System.out.println("未确认的消息是:"+s); System.out.println("未确认的消息编号:"+x); }; //用异步确认:就不需要手动得去等待,而是MQ会回调告诉我们哪些成功哪些不成功,我们需要一个监听器 //准备消息得监听器 监听哪些消息成功了,哪些失败了 要将监听器放在发送的前面addConfirmListener有两个重载的方法 //一个只有一个参数,是只监听成功的 另外一个是既可以监听成功的也可以监听失败的 channel.addConfirmListener(ackCallback,nackCallback);//异步的 //开始时间 long begin = System.currentTimeMillis(); for (int i=1;i<=MESSAGE_COUNT;i++) { String message = "消息"+i; //1.交换机可以""表示默认 2.路由键(Routing Key)队列名称 3.是否持久化消息 4.二进制消息体 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); //1.此处记录所有要发送的消息 outstaendingConfirms.put(channel.getNextPublishSeqNo(),message); } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"条异步批量确认,耗时:"+(end-begin)+"毫秒"); } 5.以上3中发布确认速度对比 单独发布消息确认:同步等待确认,简单,但吞吐量有限 批量发布消息确认:批量同步等待确认,简单,合理的吞吐量,一旦出现问题难推断出具体是哪个消息 异步发布消息确认:最佳性能和资源使用,在出现错误的情况下可以很好的控制,实现起来稍微麻烦 可以看到很多消息堆积在MQ 交换机 原先用""表示 默认交换机 上面学习的模式 都是如下 如何一个消息被消费两次呢? 1交换机(exchange)概念 RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列.实际上,通常生产者都不知道这些消息传递到哪个队列中. 生产者只能将消息通过信道发送到交换机.交换机的工作内容:1.接收来自生产者的消息.2.将消息推入队列.交换机的类型决定是将消息放到特定队列中还是应该丢失还是说应该把消息放到许多队列中 2交换机(exchange)类型 总共4中类型 直接(direct)也叫路由类型 主题(topic) 标题(headers) 扇出(fanout) 3无名exchange 前面交换机一致用""空字符串表示 就是无名exchange 就是这个 临时队列 没有持久化 绑定(bindings) bangdings(绑定)就是exchange(交换机)和queue(队列)的绑定关系 交换机扇出(Fanout) 发布订阅模式,它是将接收到的所有消息广播到它知道的所有队列中。 生产者 public class EmitLog {//生产者 //交换机的名称 public static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner=new Scanner(System.in); String message=""; while(scanner.hasNext()){ message=scanner.nextLine(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息完毕"); } } } 消费者1 public class ReceiveLogs01 {//接收消息 //选择交换机的名称 public static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //声明一个临时队列 队列名称随机 当消费者断开连接,该队列会自动删除 会返回一个队列名 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 1.是队列名 2.是交换机名 3是路由key channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("客户端1等待接收消息....."); //接收消息 DeliverCallback deliverCallback=(x,y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("客户端1接收到的消息:"+s); }; //消费者取消消息时回调接口 CancelCallback cancelCallback=x->{ System.out.println("客户端1接收消息被中断...."); }; //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(queueName, true, deliverCallback,cancelCallback); } } 消费者2 public class ReceiveLogs02 {//接收消息 //选择交换机的名称 public static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //声明一个临时队列 队列名称随机 当消费者断开连接,该队列会自动删除 会返回一个队列名 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列 1.是队列名 2.是交换机名 3是路由key channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("客户端2等待接收消息....."); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("客户端2接收到的消息:"+s); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("客户端2接收消息被中断...."); }; //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(queueName, true, deliverCallback,cancelCallback); } } 交换机(direct)直接交换机 和扇出(Fanout)区别在于RoutingKey RoutingKey相同就是扇出(Fanout) 不同就是(direct) 比如一个消息,希望C1接收不希望C2接收 这种区别对待 有选择性的交换机 生产者 public class DirectLogs { //交换机的名称 public static final String EXCHANGE_NAME="direct_logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner=new Scanner(System.in); String message=""; while(true){ System.out.println("输入消息:"); message=scanner.nextLine(); System.out.println("输入RoutingKey"); String routingKey=scanner.nextLine(); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息完毕"); } } } 消费者1 public class ReceiveLogsDirect01 { //交换机名称 public static final String EXCHANGE_NAME="direct_logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个队列 channel.queueDeclare("console", false, false, false, null); //绑定交换机和队列 1.是队列名 2.是交换机名 3是路由key channel.queueBind("console", EXCHANGE_NAME, "info"); channel.queueBind("console", EXCHANGE_NAME, "warning"); System.out.println("客户端1等待接收消息....."); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("客户端1接收到的消息:"+s); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("客户端1接收消息被中断...."); }; //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume("console", true, deliverCallback,cancelCallback); } } 消费者2 public class ReceiveLogsDirect02 { //交换机名称 public static final String EXCHANGE_NAME="direct_logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个队列 channel.queueDeclare("disk", false, false, false, null); //绑定交换机和队列 1.是队列名 2.是交换机名 3是路由key channel.queueBind("disk", EXCHANGE_NAME, "error"); System.out.println("客户端2等待接收消息....."); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("客户端2接收到的消息:"+s); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("客户端2接收消息被中断...."); }; //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume("disk", true, deliverCallback,cancelCallback); } } 交换机(Topic)主题交换机 结合了Fanout和Direct,更加灵活,想发给谁就发给谁 通过RoutingKey匹配,好比一个通配符的概念 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct 生产者 public class EmitLogTopic { //交换机的名称 public static final String EXCHANGE_NAME="topic_logs"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //选择交换机一个交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for(Map.Entry channel.basicPublish(EXCHANGE_NAME, stringStringEntry.getKey(), null, stringStringEntry.getValue().getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息:"+stringStringEntry.getValue()); } // Scanner scanner=new Scanner(System.in); // String message=""; // while(true){ // System.out.println("输入消息:"); // message=scanner.nextLine(); // System.out.println("输入RoutingKey"); // String routingKey=scanner.nextLine(); // channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); // System.out.println("生产者发送消息完毕"); // } } } 消费者1 public class ReceiveLogsTopic01 { //交换机的名称 public static final String EXCHANGE_NAME="topic_logs"; //接收消息 public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //指定交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明队列 String queueName="Q1"; channel.queueDeclare(queueName, false, false, false, null); //绑定 1.队列名 2.交换机名 3.RoutingKey channel.queueBind(queueName,EXCHANGE_NAME , "*.orange.*"); System.out.println("C1等待接收消息"); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("C1接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C1接收消息被中断...."); }; //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(queueName, true, deliverCallback,cancelCallback); } } 消费者2 public class ReceiveLogsTopic02 {//声明主题交换机及相关队列 消费者1 //交换机的名称 public static final String EXCHANGE_NAME="topic_logs"; //接收消息 public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //指定交换机名字和类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明队列 String queueName="Q2"; channel.queueDeclare(queueName, false, false, false, null); //绑定 1.队列名 2.交换机名 3.RoutingKey channel.queueBind(queueName,EXCHANGE_NAME , "*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME , "lazy.#");//#代表匹配多个.分割的单词 如"lazy.rabbit", "lazy.orange.rabbit" System.out.println("C2等待接收消息"); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("C2接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C2接收消息被中断...."); }; //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(queueName, true, deliverCallback,cancelCallback); } } 死信队列 1.死信的概念 死信:无法被消费的消息. 由于特定原因:producer(生产者)将消息投递到broker(中间件)或者直接到queue里,Consumer(消费者)从queue取出消息进行消费,但是由于某种原因导致queue中的某些消息无法被消费.这样的消息如果没有后续处理就变成了死信,有死信就有了死信队列 2.死信的来源(产生的原因) 1.消息TTL过期 2.队列达到最大长度(队列满了,无法再添加信的数据到MQ中,MQ会采取一定的策略来处理这些无法存储的新消息.最常见的策略是丢弃队列中的旧消息(例如最早进入队列未被消费的消息)来为新消息腾出空间.被丢弃的就消息就成为死信,如果队列配置了死信交换机(DLX),这些死信会被发送到指定的死信交换机,或被路由到特定的死信队列中)-->注意如果MQ采用的是拒绝新消息,因新消息无法根本没办法进队列,更没有机会到死信队列 3.消息被拒绝(NACK):当消费者接收到一条消息后,如果它决定不处理这条消息,并且调用了拒绝(basic.reject或basic.nack)方法拒绝消息,同时设置requeue参数为false,意味着不希望RabbitMQ重新将这条消息放入队列中,那么这条消息就会变成死信。 3.死信实战 1.代码架构图 2.消息TTL过期 首先启动客户端1 public class Consumer01 {//死信队列实战 消费者1 //普通交换机名称 public static final String NORMAL_EXCHANGE="normal_exchange"; //死信交换机名称 public static final String DEAD_EXCHANGE="dead_exchange"; //普通队列名称 public static final String NORMAL_QUEUE="normal_queue"; //死信队列名称 public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //指定交换机名字和类型 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //参数 Map //TTL过期时间 一般 生产者者发送消息时指定比较常见 //arguments.put("x-message-ttl", 100000);//单位毫秒 这个过期时间也可以由生产者发送消息时候指定 //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //声明队列 1.队列名称 2.持久化 3.排他性 4.自动删除 5.其他参数(可以通过这个参数设置队列的TTL(Time-To-Live),最大长度等) channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定 1.队列名 2.交换机名 3.RoutingKey //绑定普通交换机与普通队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE , "zhangsan"); //绑定死信的交换机与死信队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE , "lisi"); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("C1接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C1接收消息被中断...."); }; System.out.println("C1等待接收消息....."); //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(NORMAL_QUEUE, true, deliverCallback,cancelCallback); } } 下面将客户端1关闭 开启生产者 由于客户端1已经关闭 TTL时间为10秒,10秒后这些消息就会到死信队列 public class Producer {//死信队列演示 之生产者 //普通交换机名称 public static final String NORMAL_EXCHANGE="normal_exchange"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //设置TTL时间 AMQP.BasicProperties properties= new AMQP.BasicProperties().builder().expiration("10000").build();//10秒 for(int i=1;i<11;i++){ String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8)); } } } 由于客户端已经关闭 10秒后 下面开启C2对死信队列中的消息进行消费 public class Consumer02 { //消费者从队列中接收消息时,并不需要直接指定交换机。这是因为消息的路由(即从生产者到队列的过程) // 是通过交换机进行的,但一旦消息到达队列,消费者就直接从队列中获取消息,而与交换机无关。 //死信队列名称 public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("C2接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C2接收消息被中断...."); }; System.out.println("C2等待接收消息....."); //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(DEAD_QUEUE, true, deliverCallback,cancelCallback); } } 一启动,马上接收到消息 3.队列达到最大长度 设置队列长度 public class Consumer01 {//死信队列实战 消费者1 //普通交换机名称 public static final String NORMAL_EXCHANGE="normal_exchange"; //死信交换机名称 public static final String DEAD_EXCHANGE="dead_exchange"; //普通队列名称 public static final String NORMAL_QUEUE="normal_queue"; //死信队列名称 public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //指定交换机名字和类型 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //参数 Map //TTL过期时间 一般 生产者者发送消息时指定比较常见 //arguments.put("x-message-ttl", 100000);//单位毫秒 这个过期时间也可以由生产者发送消息时候指定 //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //设置队列长度限制 arguments.put("x-max-length", 6); //声明队列 1.队列名称 2.持久化 3.排他性 4.自动删除 5.其他参数(可以通过这个参数设置队列的TTL(Time-To-Live),最大长度等) channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定 1.队列名 2.交换机名 3.RoutingKey //绑定普通交换机与普通队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE , "zhangsan"); //绑定死信的交换机与死信队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE , "lisi"); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("C1接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C1接收消息被中断...."); }; System.out.println("C1等待接收消息....."); //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(NORMAL_QUEUE, true, deliverCallback,cancelCallback); } } 因为队列属性变了,需要在界面把原先队列删除掉 启动客户端1 可以看到下图, 然后关闭客户端1 假死 启动生产者 public class Producer {//死信队列演示 之生产者 //普通交换机名称 public static final String NORMAL_EXCHANGE="normal_exchange"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //设置TTL时间 // AMQP.BasicProperties properties= // new AMQP.BasicProperties().builder().expiration("10000").build();//10秒 for(int i=1;i<11;i++){ String message="info"+i; //channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes(StandardCharsets.UTF_8)); } } } 可以看到 只能6个 剩下4个在死信队列 4.消息被拒 可以启动消费者1和消费者2将上面普通队列和死信队列中的消息消费掉,下面演示消息被拒 先删除队列,因为属性又要变了 启动客户端1 要开启手动应答,如果是自动应答就不存在拒绝 public class Consumer01 {//死信队列实战 消费者1 //普通交换机名称 public static final String NORMAL_EXCHANGE="normal_exchange"; //死信交换机名称 public static final String DEAD_EXCHANGE="dead_exchange"; //普通队列名称 public static final String NORMAL_QUEUE="normal_queue"; //死信队列名称 public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //指定交换机名字和类型 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //参数 Map //TTL过期时间 一般 生产者者发送消息时指定比较常见 //arguments.put("x-message-ttl", 100000);//单位毫秒 这个过期时间也可以由生产者发送消息时候指定 //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","lisi"); //设置队列长度限制 //arguments.put("x-max-length", 6); //声明队列 1.队列名称 2.持久化 3.排他性 4.自动删除 5.其他参数(可以通过这个参数设置队列的TTL(Time-To-Live),最大长度等) channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定 1.队列名 2.交换机名 3.RoutingKey //绑定普通交换机与普通队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE , "zhangsan"); //绑定死信的交换机与死信队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE , "lisi"); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); //演示拒绝消息 if(s.equals("info5")){//让info5成为死信 //第一个参数是标签 第二个表示放不放回队列(false表示不放回普通队列,成死信) channel.basicReject(y.getEnvelope().getDeliveryTag(), false); System.out.println("C1接收到:"+s+",此消息被拒绝"); }else{ System.out.println("C1接收消息:"+s); channel.basicAck(y.getEnvelope().getDeliveryTag(), false); } //如下打印还是会显示被拒绝的消息 //System.out.println("C1接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C1接收消息被中断...."); }; System.out.println("C1等待接收消息....."); //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(NORMAL_QUEUE, false, deliverCallback,cancelCallback);//开启手动应答 } } 启动生产者 public class Producer {//死信队列演示 之生产者 //普通交换机名称 public static final String NORMAL_EXCHANGE="normal_exchange"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //设置TTL时间 // AMQP.BasicProperties properties= // new AMQP.BasicProperties().builder().expiration("10000").build();//10秒 for(int i=1;i<11;i++){ String message="info"+i; //channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes(StandardCharsets.UTF_8)); } } } 可以点进去看看是不是info5 开启客户端2消费死信 public class Consumer02 { //消费者从队列中接收消息时,并不需要直接指定交换机。这是因为消息的路由(即从生产者到队列的过程) // 是通过交换机进行的,但一旦消息到达队列,消费者就直接从队列中获取消息,而与交换机无关。 //死信队列名称 public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //获取信道 Channel channel = RabbitMQUtils.getChannel(); //接收消息 DeliverCallback deliverCallback=(x, y)->{ String s=new String(y.getBody(),"UTF-8"); System.out.println("C2接收到的消息:"+s+",绑定的键:"+y.getEnvelope().getRoutingKey()); }; //消费者取消消息时回调接口 CancelCallback cancelCallback= x->{ System.out.println("C2接收消息被中断...."); }; System.out.println("C2等待接收消息....."); //1.消费哪个队列 2.是否自动应答 3.消费者成功接收到消息的回调 4.消费者取消消费的回调 就是被中断 channel.basicConsume(DEAD_QUEUE, true, deliverCallback,cancelCallback); } } 被消费 延迟队列 1.延迟队列概念 延迟队列其实是死信队列的一种,消息的TTL过期就是延迟队列 2.延迟队列使用场景 1.订单在十分钟之内未支付则自动取消 2.新创建的店铺,如果在10天内没有上传过商品,则自动发送消息提醒 3.用户注册成功后,如果三天内没有登录则进行短信提醒 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员 5.预定会议后,需要在预定的时间点前10分钟通知各个参会人员参加会议等等 整合SpringBoot(延迟队列) 引入lombok spring web rabbitMQ fastjson swagger rabbittest xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> application.properties 添加Swagger配置类 @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(){ return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(){ return new ApiInfoBuilder() .title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("hrui", "http://xxxx.com", "376084295@qq.com")) .build(); } } 队列TTL 代码架构图 创建两个队列QA和QB,两个队列TTL分别设置为10S和40秒,然后创建一个交换机X和死新交换机Y,他们的类型都是direct,创建一个死信队列QD,他们的绑定关系如下: 在SpringBoot中,交换机,队列 绑定关系 都需要在一个配置类中声明 package com.example.springbootrabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author hrui * @date 2024/2/29 7:00 */ @Configuration public class TtlQueueConfig { //普通交换机的名称 public static final String X_EXCHANGE = "X"; //普通队列名称 public static final String QUEUE_A = "QA"; //普通队列名称 public static final String QUEUE_B = "QB"; //死信交换机名称 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //死信队列名称 public static final String DEAD_LETTER_QUEUE = "QD"; // 声明 xExchange @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } // 声明 xExchange @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列 A ttl 为 10s 并绑定到对应的死信交换机 @Bean("queueA") public Queue queueA(){ Map //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 声明队列 A 绑定 X 交换机 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明队列 B ttl 为 40s 并绑定到对应的死信交换机 @Bean("queueB") public Queue queueB(){ Map //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } //声明队列 B 绑定 X 交换机 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } //声明死信队列 QD @Bean("queueD") public Queue queueD(){ return new Queue(DEAD_LETTER_QUEUE); } //声明死信队列 QD 绑定关系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } } 接收前端发送请求 并做为生产者发送给MQ package com.example.springbootrabbitmq.controller; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * @author hrui * @date 2024/2/29 8:01 */ @RestController @RequestMapping("ttl") @Slf4j @AllArgsConstructor public class SendMsgController {//发送延迟消息 private final RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("当前时间:{},送法一条消息给两个TTL队列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列"); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列"); } } 消费者监听 package com.example.springbootrabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.Date; /** * @author hrui * @date 2024/2/29 8:20 */ @Component @Slf4j public class DeadLetterQueueConsumer {//队列TTL消费者 //接收消息 @RabbitListener(queues = "QD")//表示监听QD队列 public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException { String msg=new String(message.getBody(),"UTF-8"); log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg); } } 启动SpringBoot 访问:localhost:8081/ttl/sendMsg/哈哈哈哈 上面例子中存在一个问题,前端发送后,该消息分别给两个队列10S后,40秒后变成死信队列,这样一个延迟队列就打造完成,但是如果时间上加个新需求,就要新增一个队列,这样的话要创建很多队列 延迟队列优化 新增一个队列QC,绑定关系如下,该队列不设置TTL时间 QC的TTL时间在生产者给MQ发送消息时候指定 也就是说本来就应该这么去干 配置类中加以下代码 //普通队列名称 public static final String QUEUE_C = "QC"; @Bean("queueC") public Queue queueC(){ Map //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL //args.put("x-message-ttl", 40000);//不设置TTL时间 生产者指定 return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } //声明队列 C 绑定 X 交换机 @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queue1C).to(xExchange).with("XC"); } 生产者代码 @RestController @RequestMapping("ttl") @Slf4j @AllArgsConstructor public class SendMsgController {//发送延迟消息 private final RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("当前时间:{},送法一条消息给两个TTL队列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列:"+message); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列:"+message); rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为5S的队列:"+message, x->{ x.getMessageProperties().setExpiration("5000");//这个参数也可以前端传进来 return x; }); } } 启动测试 localhost:8081/ttl/sendMsg/哈哈哈 延迟队列基于死信存在的问题 @RestController @RequestMapping("ttl") @Slf4j @AllArgsConstructor public class SendMsgController {//发送延迟消息 private final RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("当前时间:{},送法一条消息给两个TTL队列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的队列:"+message); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40S的队列:"+message); rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为50S的队列:"+message, x->{ x.getMessageProperties().setExpiration("5000");//这个参数也可以前端传进来 return x; }); } @GetMapping("/sendMsg/{message}/{ttl}") public void sendMsg(@PathVariable String message,@PathVariable String ttl){ log.info("当前时间:{},送法一条消息给两个TTL队列:{}",new Date().toString(),message); rabbitTemplate.convertAndSend("X", "XC", "消息来指定ttl为"+ttl+"的队列:"+message, x->{ x.getMessageProperties().setExpiration(ttl); return x; }); } } 出现的问题是 时间有点乱TTL 就是说TTL 2秒 和TTL20秒 所用的时候可能超过20秒 就是说来的时候是一起来的 死信队列的缺陷:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延长时间很长,而第二个消息的延长时间很短,第二个消息并不会优先得到执行 延迟队列基于插件解决上述问题 安装延时队列插件 需要安装插件rabbitmq_delayed_message_exchange Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub 将文件拷贝到容器内 docker cp /usr/local/develop/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins/ 进入rabbitmq容器实例 docker exec -it rabbitmq /bin/bash cd plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange 注意:确保你的容器是后台启动的 不然docker stop rabbitmq的话就找不到运行过的容器了 如果你前台启动那只能重装了 docker restart rabbitmq 现在就好了 有明显的时间顺序 插件安装完成后 会多这么个玩意 基于插件的延迟队列 安装玩插件之后多了一个延迟类型 新增配置文件 @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } //自定义交换机 我们在这里定义的是一个延迟交换机 @Bean public CustomExchange delayedExchange() { Map //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } } 控制器代码 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("sendDelayMsg/{message}/{delayTime}") public void sendMsgA(@PathVariable String message,@PathVariable Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> { correlationData.getMessageProperties().setDelay(delayTime); return correlationData; }); log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message); } 消费者代码 @Component @Slf4j public class DeadLetterQueueConsumer {//队列TTL消费者 //接收消息 @RabbitListener(queues = "QD")//表示监听QD队列 public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException { String msg=new String(message.getBody(),"UTF-8"); log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg); } @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg); } } 发布高级确认 在生产环境中由于一些不明原因,导致RabbitMQ重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复.特别是在一些极端情况下RabbitMQ集群不可用的时候,无法投递的消息如何处理? SpringBoot发布确认 就是说交换机收不到 甚至没有回调 怎么办 配置类 @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //声明业务 Exchange @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } // 声明确认队列 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } // 声明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key1"); } } 生产者 @RestController @RequestMapping("/confirm") @Slf4j public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; //依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message){ //指定消息 id 为 1 CorrelationData correlationData1=new CorrelationData("1"); String routingKey="key1"; rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1); CorrelationData correlationData2=new CorrelationData("2"); routingKey="key2"; rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2); log.info("发送消息内容:{}",message); } } 消费者 @Component @Slf4j public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @RabbitListener(queues =CONFIRM_QUEUE_NAME) public void receiveMsg(Message message){ String msg=new String(message.getBody()); log.info("接受到队列 confirm.queue 消息:{}",msg); } } 回调 @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机不管是否收到消息的一个回调方法 * CorrelationData * 消息相关数据 * ack * 交换机是否收到消息 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null?correlationData.getId():""; if(ack){ log.info("交换机已经收到 id 为:{}的消息",id); }else{ log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause); } } } 精彩链接
发表评论