一、基本概念讲解
如上图所示,RabbitMQ中有几个最基本的概念,Broker、Exchange、Bindings、Queue
Broker是RabbitMQ的消息代理服务器,主要负责接收对于发送到当前MQ的所有Message,只要是发送到当前MQ的Message,都会先由Broker接收下来再进行其他操作。
Exchange 是消息的路由器,会根据一定的规则将消息路由到一个或多个队列中
Binding即 Exchange 和 Queue 之间的关联关系。它定义了 Exchange 如何将消息路由到与之绑定的队列中。例如myExchange与Queue1有着关联就要通过Binding进行绑定,不能平白无故的直接关联。同时Binding还要记录exchange与queue的路由规则,也就是什么样的路由规则才会通过这个exchange到达queue。
Queue是消息的存储位置,它用于保存待处理的消息。消费者订阅队列,以便从队列中获取消息并进行处理。队列保证了消息的顺序处理和可靠存储。
流程:消息发送到MQ,Broker先接收下来再分配到消息所指定到达的exchange,再由exchange按照Binding中的routing-key(路由规则)来将消息路由到对应的queue
二、SpringBoot整合RabbitMQ
添加依赖:
在application.properties中添加RabbitMQ的基本配置
spring.rabbitmq.host=192.168.231.133
spring.rabbitmq.port=5672
# 若新建了其他虚拟主机,则可以根据需要填写指定虚拟的地址
spring.rabbitmq.virtual-host=/
在启动类上添加@EnableRabbit注解
至此,项目中可以通过@AutoWired进行RabbitMQ中的各种类的操作了。这里主要介绍两个操作类 RabbitTemplate、AmqpAdmin
注意:只要引入了amqp依赖starter,则RabbitAutoConfiguration就会自动生效,这个配置类给容器中自动配置了RabbitTemplate AmqpAdmin CachingConnectionFactory RabbitMessagingTemplate等,这些是可以通过@Autowired拿来直接使用的
三、AmqpAdmin基本用法
创建交换机Exchange
//这是全部的构造参数DirectExchange(String name, boolean durable, boolean autoDelete, Map
/**
* name(必填参数):这是交换机的名称,用于标识交换机。每个交换机都必须有一个唯一的名称,以便在 RabbitMQ 中进行识别。
*
* durable(可选参数):这是一个布尔值,用于指示交换机是否持久化。如果设置为 true,则表示交换机是持久化的,即在 RabbitMQ 服务器重启后仍然存在。如果设置为 false,则表示交换机是非持久化的。
*
* autoDelete(可选参数):这是一个布尔值,用于指示交换机是否自动删除。如果设置为 true,则表示当没有队列与该交换机绑定时,交换机将自动删除。如果设置为 false,则表示交换机不会自动删除。
*
* internal(可选参数):这是一个布尔值,用于指示交换机是否是内部的。内部交换机只能被交换机绑定,不能被队列绑定。通常情况下,这个参数保持默认值 false,表示交换机是外部的。
*
* arguments(可选参数):这是一个 Map
*/
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
创建队列
//这是全部的构造参数Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map
/**
* name(必填参数):这是队列的名称,用于唯一标识队列。每个队列都必须具有一个唯一的名称,以便在 RabbitMQ 中进行识别。
*
* durable(可选参数):这是一个布尔值,用于指示队列是否持久化。如果设置为 true,则表示队列是持久化的,即在 RabbitMQ 服务器重启后仍然存在。如果设置为 false,则表示队列是非持久化的。
*
* exclusive(可选参数):这是一个布尔值,用于指示队列是否是独占的。如果设置为 true,则表示该队列只能被当前连接的消费者独占使用,其他连接的消费者无法访问。通常情况下,设置为 false。
*
* autoDelete(可选参数):这是一个布尔值,用于指示队列是否自动删除。如果设置为 true,则表示当没有消费者连接到队列时,队列将自动删除。如果设置为 false,则表示队列不会自动删除。
*
* arguments(可选参数):这是一个 Map
*/
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
创建绑定关系
//这是全部的构造参数Binding(String destination, DestinationType destinationType, String exchange, String routingKey,Map
/**
* destination(必填参数):这是绑定的目标,可以是队列的名称或交换机的名称,取决于 destinationType 参数的值。如果 destinationType 设置为 DestinationType.QUEUE,则 destination 应该是队列的名称;如果 destinationType 设置为 DestinationType.EXCHANGE,则 destination 应该是交换机的名称。
*
* destinationType(必填参数):这是一个枚举类型的参数,用于指示 destination 参数表示的是队列还是交换机。可以使用 DestinationType.QUEUE 表示队列,或使用 DestinationType.EXCHANGE 表示交换机。
*
* exchange(必填参数):这是要绑定到的交换机的名称。绑定关系将会建立在这个交换机和目标(队列或交换机)之间。通常,你需要指定一个已存在的交换机名称。
*
* routingKey(必填参数):这是路由键,用于确定消息在交换机和目标之间的路由方式。具体的路由逻辑取决于交换机的类型和配置。路由键通常与消息的路由策略相关。
*
* arguments(可选参数):这是一个 Map
*/
Binding binding = new Binding(
"hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null
);
amqpAdmin.declareBinding(binding);
四、RabbitTemplate基本用法
使用@RabbitListener注解进行接收消息
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, Entity entity, Channel channel){
byte[] body = message.getBody();//消息体
MessageProperties messageProperties = message.getMessageProperties();//消息头
System.out.println("接受到的消息..."+message+"===内容"+entity);
}
推荐链接
发表评论