1.RabbitMQ安装

RabbitMQ保姆级安装:Linux Centos8系统

2.RabbitMQ可视化页面翻译

RabbitMQ可视化页面使用(中文翻译)

3.消息推送接收

3.1.流程图

3.2.常用三种交换机

1.Direct Exchange 直连型交换机,根据消息携带的路由键将消息投递给对应队列。 大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列, routing key必须相等。 直连交换机是一对一,如果配置多台监听绑定到同一个直连交换的同一个队列会轮询的方式对消息进行消费,而且不存在重复消费。

1.Fanout Exchange 扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的 所有队列。

2.Topic Exchange 主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。 topic 的模式匹配包括两个通配符:#和* ,其中 # 匹配 0 或多个单词, * 匹配一个单词 (必须出现的)。

4.Springboot+RabbitMQ

4.1.项目初始化

我们可以先注释已写的消费者(不然瞬间消费控制台看不到效果),使用同一个项目模拟,发送完消息后,再重新启动放开进行消费,这样就不用再跑一个项目进行模拟调用了,完美。

4.1.1.pom引入

测试Springboot版本:3.0.1

org.springframework.boot

spring-boot-starter-amqp

4.1.2.yml参数配置

RabbitMQ 参数配置说明

# Spring

spring:

# rabbitmq 配置

rabbitmq:

addresses: 139.x.xxx.xxx

port: 5672

username: admin

password: 123456

listener:

simple:

#最小消费者数量

concurrency: 10

#最大的消费者数量

max-concurrency: 10

4.2.项目实战

4.2.1.Direct Exchange

4.2.1.1.DirectRabbitController

package com.cn.controller.test.rabbitmq;

import cn.hutool.core.date.DateUtil;

import com.alibaba.fastjson.JSON;

import com.cn.common.AjaxResult;

import io.swagger.v3.oas.annotations.tags.Tag;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/**

* @date 2023/9/20

* @description 发送消息

*/

@Tag(name = "RabbitMQ")

@RestController

@Slf4j

@RequestMapping("test/rabbitmq")

public class DirectRabbitController{

@Autowired

RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法

@GetMapping("/sendDirectMsg")

public AjaxResult sendDirectMsg() {

String msgId = String.valueOf(UUID.randomUUID());

String msmData = "test msg:" + Math.random();

String createTime = DateUtil.now();

Map map = new HashMap<>();

map.put("msgId", msgId);

map.put("msmData", msmData);

map.put("createTime", createTime);

String payload = JSON.toJSONString(map);

// 如果写的是默认交换机,可以不传交换机名称

// rabbitTemplate.convertAndSend("TestDirectRouting", payload);

// 将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange

rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", payload);

return AjaxResult.success();

}

@GetMapping("/sendDirectMsg2")

public AjaxResult sendDirectMsg2() {

System.out.println("sendDirectMsg2准备发送");

String msgId = String.valueOf(UUID.randomUUID());

String msmData = "test msg:" + Math.random();

String createTime = DateUtil.now();

Map map = new HashMap<>();

map.put("msgId2", msgId);

map.put("msmData2", msmData);

map.put("createTime2", createTime);

String payload = JSON.toJSONString(map);

// 1.由于bindingDirect2绑定交换机使用的默认交换机,使用withQueueName表示用和队列名称相同的键值对

// rabbitTemplate.convertAndSend("TestDirectQueue2", payload);

// 2.将消息携带绑定键值:TestDirectRouting2 发送到交换机TestDirectExchange2

// rabbitTemplate.convertAndSend("TestDirectExchange2", "TestDirectRouting2", payload);

rabbitTemplate.convertAndSend("TestDirectExchange2", "TestDirectRouting2", payload);

return AjaxResult.success();

}

}

4.2.1.2.DirectRabbitConfig

package com.cn.controller.test.rabbitmq;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @date 2023/9/20

* @description 配置类

*/

@Configuration

public class DirectRabbitConfig {

// 队列 起名:TestDirectQueue

@Bean

public Queue TestDirectQueue() {

// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效

// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable

// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

// return new Queue("TestDirectQueue",true,true,false);

// 一般设置一下队列的持久化就好,其余两个就是默认false

return new Queue("TestDirectQueue", true);

}

// Direct交换机 name:TestDirectExchange,durable:是否持久化,autoDelete:是否自动删除

@Bean

DirectExchange TestDirectExchange() {

// return new DirectExchange("TestDirectExchange",true,true);

return new DirectExchange("TestDirectExchange", true, false);

}

// 绑定,将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting

@Bean

Binding bindingDirect() {

return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");

}

// 队列2 起名:TestDirectQueue2

@Bean

public Queue TestDirectQueue2() {

return new Queue("TestDirectQueue2");

}

// 交换机2

@Bean

DirectExchange TestDirectExchange2() {

return new DirectExchange("TestDirectExchange2");

}

// 绑定交换机2(使用默认交换机配置)

@Bean

Binding bindingDirect2() {

// 1.默认配置,routingKey:使用withQueueName表示用和队列名称相同的键值对(这种写法是错误的“默认交换机不允许设置routingKey。默认交换机是自动的,不需要声明交换机也不需要指定路由key。在发送消息时,交换机给空值,路由key给声明的队列名就可以了”)

// return BindingBuilder.bind(TestDirectQueue2()).to(DirectExchange.DEFAULT).with("TestDirectRouting2"); 错误的写法

// 2.使用默认交换机不允许设置routingKey,故使用默认配置“withQueueName”

// return BindingBuilder.bind(TestDirectQueue2()).to(DirectExchange.DEFAULT).withQueueName();

// 3.使用交换机2

return BindingBuilder.bind(TestDirectQueue2()).to(TestDirectExchange2()).with("TestDirectRouting2");

}

}

4.2.1.3.DirectReceiverConsumer

package com.cn.controller.test.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

* @date 2023/9/20

* @description 消费者

*/

@Component

// 监听也可以放下面,写多个监听

// @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue

public class DirectReceiverConsumer {

@RabbitListener(queues = "TestDirectQueue")// 监听的队列名称 TestDirectQueue

@RabbitHandler

public void process1_1(String msg) {

System.out.println("DirectReceiverConsumer:1_1消费者收到(TestDirectQueue)消息 : " + msg);

}

@RabbitListener(queues = "TestDirectQueue")// 监听的队列名称 TestDirectQueue

@RabbitHandler

public void process1_2(String msg) {

System.out.println("DirectReceiverConsumer:1_2消费者收到(TestDirectQueue)消息 : " + msg);

}

@RabbitListener(queues = "TestDirectQueue2")// 监听的队列名称 TestDirectQueue2

@RabbitHandler

public void process2(String msg) {

System.out.println("DirectReceiverConsumer消费者收到(TestDirectQueue2)消息 : " + msg);

}

}

4.2.1.4.发送和接受消息

4.2.1.4.1.查询MQ控制台已有交换机和队列

4.2.1.4.2.使用Apifox模拟发送请求

两个消息同时发送100条消息。 消息1有两个消费者,消息2一个消费者。

4.2.1.4.3.消费者进行消费

4.2.2.Fanout Exchange

4.2.2.1.FanoutRabbitController

package com.cn.controller.test.rabbitmq;

import cn.hutool.core.date.DateUtil;

import com.alibaba.fastjson.JSON;

import com.cn.common.AjaxResult;

import io.swagger.v3.oas.annotations.tags.Tag;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/**

* @date 2023/9/20

* @description Fanout交换机 发送消息

*/

@Tag(name = "RabbitMQ")

@RestController

@Slf4j

@RequestMapping("test/rabbitmq")

public class FanoutRabbitController {

@Autowired

RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法

@GetMapping("/sendFanoutMsg")

public AjaxResult sendFanoutMsg() {

String msgId = String.valueOf(UUID.randomUUID());

String msmData = "test msg:" + Math.random();

String createTime = DateUtil.now();

Map map = new HashMap<>();

map.put("msgId", msgId);

map.put("msmData", msmData);

map.put("createTime", createTime);

String payload = JSON.toJSONString(map);

// 因为是扇型交换机, 路由键无需配置,配置也不起作用

rabbitTemplate.convertAndSend("fanoutExchange", null, payload);

return AjaxResult.success();

}

}

4.2.2.2.FanoutRabbitConfig

package com.cn.controller.test.rabbitmq;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @date 2023/9/21

* @description Fanout交换机配置类

*/

@Configuration

public class FanoutRabbitConfig {

/**

* 创建三个队列 :fanout1 fanout2 fanout3

* 将三个队列都绑定在交换机 fanoutExchange 上

* 因为是扇型交换机, 路由键无需配置,配置也不起作用

*/

@Bean

public Queue queue1() {

return new Queue("fanout1");

}

@Bean

public Queue queue2() {

return new Queue("fanout2");

}

@Bean

public Queue queue3() {

return new Queue("fanout3");

}

// 交换机

@Bean

FanoutExchange fanoutExchange() {

return new FanoutExchange("fanoutExchange");

}

@Bean

Binding bindingExchange1() {

return BindingBuilder.bind(queue1()).to(fanoutExchange());

}

@Bean

Binding bindingExchange2() {

return BindingBuilder.bind(queue2()).to(fanoutExchange());

}

@Bean

Binding bindingExchange3() {

return BindingBuilder.bind(queue3()).to(fanoutExchange());

}

}

4.2.2.3.FanoutReceiverConsumer

package com.cn.controller.test.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

/**

* @date 2023/9/20

* @description 消费者

*/

@Component

public class FanoutReceiverConsumer {

@RabbitListener(queues = "fanout1")

@RabbitHandler

public void process1(@Payload String msg) {

System.out.println("FanoutReceiverConsumer1:fanout1消息 : " + msg);

}

@RabbitListener(queues = "fanout2")

@RabbitHandler

public void process2(@Payload String msg) {

System.out.println("FanoutReceiverConsumer2:fanout2消息 : " + msg);

}

@RabbitListener(queues = "fanout3")

@RabbitHandler

public void process3(@Payload String msg) {

System.out.println("FanoutReceiverConsumer3:fanout3消息 : " + msg);

}

}

4.2.2.4.发送和接受消息

4.2.2.4.1.查询MQ控制台已有交换机和队列

4.2.2.4.2.使用Apifox模拟发送请求

4.2.2.4.3.消费者进行消费

Fanout123都消费完毕

4.2.3.Topic Exchange

4.2.3.1.TopicRabbitController

package com.cn.controller.test.rabbitmq;

import cn.hutool.core.date.DateUtil;

import com.alibaba.fastjson.JSON;

import com.cn.common.AjaxResult;

import io.swagger.v3.oas.annotations.tags.Tag;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/**

* @date 2023/9/20

* @description Topic交换机 发送消息

*/

@Tag(name = "RabbitMQ")

@RestController

@Slf4j

@RequestMapping("test/rabbitmq")

public class TopicRabbitController {

@Autowired

RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法

@GetMapping("/sendTopicMsg1")

public AjaxResult sendTopicMsg1() {

String msgId = String.valueOf(UUID.randomUUID());

String msmData = "test msg:" + Math.random();

String createTime = DateUtil.now();

Map map = new HashMap<>();

map.put("msgId", msgId);

map.put("msmData", msmData);

map.put("createTime", createTime);

String payload = JSON.toJSONString(map);

rabbitTemplate.convertAndSend("topicExchange", "topic.key1", payload);

return AjaxResult.success();

}

@GetMapping("/sendTopicMsg2")

public AjaxResult sendTopicMsg2() {

String msgId = String.valueOf(UUID.randomUUID());

String msmData = "test msg:" + Math.random();

String createTime = DateUtil.now();

Map map = new HashMap<>();

map.put("msgId", msgId);

map.put("msmData", msmData);

map.put("createTime", createTime);

String payload = JSON.toJSONString(map);

rabbitTemplate.convertAndSend("topicExchange", "topic.key2", payload);

return AjaxResult.success();

}

}

4.2.3.2.TopicRabbitConfig

package com.cn.controller.test.rabbitmq;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @date 2023/9/21

* @description Topic交换机配置类

*/

@Configuration

public class TopicRabbitConfig {

// 绑定键

public final static String key1 = "topic.key1";

public final static String key2 = "topic.key2";

@Bean

public Queue Queue1() {

return new Queue(key1);

}

@Bean

public Queue Queue2() {

return new Queue(key2);

}

@Bean

TopicExchange topicExchange() {

return new TopicExchange("topicExchange");

}

// 将Key1Queue和topicExchange绑定,而且绑定的键值为topic.key1

// 这样只要是消息携带的路由键是topic.key1,才会分发到该队列

@Bean

Binding bindingExchangeMessage() {

return BindingBuilder.bind(Queue1()).to(topicExchange()).with(key1);

}

// 将Key2Queue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#

// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列

@Bean

Binding bindingExchangeMessage2() {

return BindingBuilder.bind(Queue2()).to(topicExchange()).with("topic.#");

}

}

4.2.3.3.TopicRabbitConfig

package com.cn.controller.test.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

/**

* @author Yph

* @date 2023/9/20

* @description 消费者

*/

@Component

public class TopicReceiverConsumer {

@RabbitListener(queues = "topic.key1")

@RabbitHandler

public void process1(@Payload String msg) {

System.out.println("TopicReceiverConsumer1:topic.key1消息 : " + msg);

}

@RabbitListener(queues = "topic.key2")

@RabbitHandler

public void process2(@Payload String msg) {

System.out.println("TopicReceiverConsumer2:topic.key2消息 : " + msg);

}

}

4.2.3.4.发送和接受消息

4.2.3.4.1.查询MQ控制台已有交换机和队列

4.2.3.4.2.使用Apifox模拟发送请求

1.先发送消息1 2.运行消费者进行消费 key1和key2都消费完毕。

3.送消息2

key2才接收到消息,key2绑定的键是“topic.#”,key1绑定键是“topic.key1”。 故而key1接收不到键为“topic.key2”的消息,key2可以匹配接收到。

4.2.4.生产者消息确认回调

其实就是消息确认(生产者推送消息成功,消费者接收消息成功)

4.2.4.1.CallBackRabbitConfig配置类

package com.cn.controller.test.rabbitmq;

import org.jetbrains.annotations.NotNull;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @date 2023/9/21

* @description 消息回调

* 生产者消息确认回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)

*/

@Configuration

public class CallBackRabbitConfig {

@Bean

public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {

RabbitTemplate rabbitTemplate = new RabbitTemplate();

rabbitTemplate.setConnectionFactory(connectionFactory);

// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);

System.out.println("ConfirmCallback: " + "确认情况:" + b);

System.out.println("ConfirmCallback: " + "原因:" + s);

}

});

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {

@Override

public void returnedMessage(@NotNull ReturnedMessage returnedMessage) {

System.out.println("ReturnCallback: " + "消息:" + returnedMessage.getMessage());

System.out.println("ReturnCallback: " + "回应码:" + returnedMessage.getReplyCode());

System.out.println("ReturnCallback: " + "回应信息:" + returnedMessage.getReplyText());

System.out.println("ReturnCallback: " + "交换机:" + returnedMessage.getExchange());

System.out.println("ReturnCallback: " + "路由键:" + returnedMessage.getRoutingKey());

}

});

return rabbitTemplate;

}

}

4.2.4.1.发送消息确认消息

出现下面这种情况,表示消费者正常消费消费

4.2.4.1.其他情况

4.2.5.消费者消息确认机制

消费者消息确认机制参考文章:https://blog.csdn.net/qq_35387940/article/details/100514134

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: