RabbitMq的确认机制和延时通知

一、消息发送确认

在RabbitConfig中两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;

1、交换机确认:ConfirmCallback方法

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

我们需要在生产者的配置中添加下面配置,表示开启发布者确认。

spring.rabbitmq.publisher-confirm-type=correlated # 新版本

spring.rabbitmq.publisher-confirms=true # 老版本

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。ack:消息投递到broker 的状态,true表示成功。cause:表示投递失败的原因。

2、队列确认:ReturnCallback方法

交换机接收到消息后可以判断当前的路径发送没有问题,但是不能保证消息能够发送到路由队列的。而发送者是不知道这个消息有没有送达队列的,因此,我们需要在队列中进行消息确认。这就是回退消息。

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

添加以下配置:

spring.rabbitmq.publisher-returns=true

3、消息发送确认代码实现

在rabbitConfig中实现接口

package com.it520.bookkeeping.config;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @Author : huliupan

* @CreateTime : 2022-10-13

* @Description : RabbitMQ的config

**/

@Configuration

public class RabbitConfig {

@Bean

public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){

RabbitTemplate rabbitTemplate = new RabbitTemplate();

rabbitTemplate.setConnectionFactory(connectionFactory);

//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);

System.out.println("ConfirmCallback: "+"确认情况:"+ack);

System.out.println("ConfirmCallback: "+"原因:"+cause);

}

});

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

System.out.println("ReturnCallback: "+"消息:"+returnedMessage.getMessage());

System.out.println("ReturnCallback: "+"回应码:"+returnedMessage.getReplyCode());

System.out.println("ReturnCallback: "+"回应信息:"+returnedMessage.getReplyText());

System.out.println("ReturnCallback: "+"交换机:"+returnedMessage.getExchange());

System.out.println("ReturnCallback: "+"路由键:"+returnedMessage.getRoutingKey());

}

});

return rabbitTemplate;

}

}

4、回调的触发情况

那么以上这两种回调函数都是在什么情况会触发呢?

①消息推送到server,但是在server里找不到交换机 ②消息推送到server,找到交换机了,但是没找到队列 ③消息推送到server,交换机和队列啥都没找到 ④消息推送成功

那么我先写几个接口来分别测试和认证下以上4种情况,消息确认触发回调函数的情况:

消息推送到server,但是在server里找不到交换机 写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):

@GetMapping("/TestMessageAck")

public String TestMessageAck() {

String messageId = String.valueOf(UUID.randomUUID());

String messageData = "message: non-existent-exchange test message ";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

Map map = new HashMap<>();

map.put("messageId", messageId);

map.put("messageData", messageData);

map.put("createTime", createTime);

rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);

return "ok";

}

调用接口,查看rabbitmq-provuder项目的控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’):

ConfirmCallback: 相关数据:null

ConfirmCallback: 确认情况:false

ConfirmCallback: 原因:channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

2022-10-14 16:15:58.721 ERROR 4868 --- [.98.153.34:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

​ 结论: ①这种情况触发的是 ConfirmCallback 回调函数。

消息推送到server,找到交换机了,但是没找到队列 这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:

@Bean

DirectExchange lonelyDirectExchange() {

return new DirectExchange("lonelyDirectExchange");

}

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/TestMessageAck2")

public String TestMessageAck2() {

String messageId = String.valueOf(UUID.randomUUID());

String messageData = "message: lonelyDirectExchange test message ";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

Map map = new HashMap<>();

map.put("messageId", messageId);

map.put("messageData", messageData);

map.put("createTime", createTime);

rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);

return "ok";

}

调用接口,查看rabbitmq-provuder项目的控制台输出情况:

ConfirmCallback: 相关数据:null

ConfirmCallback: 确认情况:true

ConfirmCallback: 原因:null

ReturnCallback: 消息:(Body:'{createTime=2022-10-14 16:17:47, messageId=c001cbbe-7792-465f-b6bd-cb6f3bdde27b, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

ReturnCallback: 回应码:312

ReturnCallback: 回应信息:NO_ROUTE

ReturnCallback: 交换机:lonelyDirectExchange

ReturnCallback: 路由键:TestDirectRouting

可以看到这种情况,两个函数都被调用了; 这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true; 而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。 结论:这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

消息推送到sever,交换机和队列啥都没找到

@GetMapping("/TestMessageAck2")

public String TestMessageAck2() {

String messageId = String.valueOf(UUID.randomUUID());

String messageData = "message: lonelyDirectExchange test message ";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

Map map = new HashMap<>();

map.put("messageId", messageId);

map.put("messageData", messageData);

map.put("createTime", createTime);

rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);

return "ok";

}

返回结果:和没有交换机的一样

022-10-14 16:19:11.882 ERROR 4868 --- [.98.153.34:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

ConfirmCallback: 相关数据:null

ConfirmCallback: 确认情况:false

ConfirmCallback: 原因:channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

消息推送成功

那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendFanoutMessage接口,可以看到控制台输出:

ConfirmCallback: 相关数据:null

ConfirmCallback: 确认情况:true

ConfirmCallback: 原因:null

结论: 这种情况触发的是 ConfirmCallback 回调函数。

二、消息消费确认机制

Springboot 的确认模式有三种,配置如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

NONE : 不确认 :

1、默认所有消息消费成功,会不断的向消费者推送消息2、因为 rabbitmq 认为所有消息都被消费成功。所以队列中存在丢失消息风险。 AUTO:自动确认

1、根据消息处理逻辑是否抛出异常自动发送 ack(正常)和nack(异常)给服务端,如果消费者本身逻辑没有处理好这条数据就存在丢失消息的风险。2、使用自动确认模式时,需要考虑的另一件事情就是消费者过载。 MANUAL:手动确认

1、手动确认在业务失败后进行一些操作,消费者调用 ack、nack、reject 几种方法进行确认,如果消息未被 ACK 则发送到下一个消费者或重回队列。2、ack 用于肯定确认;nack 用于 否定确认 ;reject 用于否定确认(一次只能拒绝单条消息)

1、自动确认

自动确认是指消费者在消费消息的时候,当消费者收到消息后,消息就会被 RabbitMQ 从队列中删除掉。这种模式认为 “发送即成功”。这是不安全的,因为消费者可能在业务中并没有成功消费完就中断了

2、手动确认

手动确认又分为肯定确认和否定确认。

2.1 basicAck 方法(肯定确认)

basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

参数说明:

long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

2.2 basicNack 方法(否定确认)

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

参数说明:

long deliveryTag:唯一标识 ID。

boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。

2.3 basicReject 方法(否定确认)

basicReject 方法用于明确拒绝当前的消息而不是确认。 RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

参数说明:

long deliveryTag:唯一标识 ID。

boolean requeue:上面已经解释。

利用之前的Fanout交换机的消息发送来测试消息确认

package com.it520.bookkeeping.receiver;

import com.it520.bookkeeping.config.FanoutRabbitConfig;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.util.Map;

import java.util.concurrent.ScheduledThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* @Author : huliupan

* @CreateTime : 2022/10/13

* @Description :Fanout交换机的消费者

**/

@Component

public class FanoutReceiver {

@RabbitHandler

@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)

public void processA(Map testMessage, Message message, Channel channel) throws IOException, ClassNotFoundException {

String consumerQueueName = message.getMessageProperties().getConsumerQueue();

long deliveryTag = message.getMessageProperties().getDeliveryTag();

if (FanoutRabbitConfig.FANOUT_QUEUE_A.equals(consumerQueueName)) {

/**

* 确认消息,参数说明:

* long deliveryTag:唯一标识 ID。

* boolean multiple:是否批处理,当该参数为 true 时,

* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。

*/

channel.basicAck(deliveryTag, true);

System.out.println("fanout.a收到肯定确认了" + deliveryTag);

}

}

@RabbitHandler

@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)

public void processB(Map testMessage, Message message, Channel channel) throws IOException {

String consumerQueueName = message.getMessageProperties().getConsumerQueue();

long deliveryTag = message.getMessageProperties().getDeliveryTag();

if (FanoutRabbitConfig.FANOUT_QUEUE_B.equals(consumerQueueName)) {

/**

* 否定消息,参数说明:

* long deliveryTag:唯一标识 ID。

* boolean multiple:是否批处理,当该参数为 true 时,

* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。

* boolean requeue:如果 requeue 参数设置为 true,

* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;

* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,

* 而不会把它发送给新的消费者。

*/

channel.basicNack(deliveryTag, true, false);

System.out.println("fanout.B收到否定确认了" + deliveryTag + "未重新放入队列 ");

}

}

@RabbitHandler

@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)

public void processC(Map testMessage, Message message, Channel channel) throws IOException, InterruptedException {

String consumerQueueName = message.getMessageProperties().getConsumerQueue();

long deliveryTag = message.getMessageProperties().getDeliveryTag();

Thread.sleep(5000);

if (FanoutRabbitConfig.FANOUT_QUEUE_C.equals(consumerQueueName)) {

/**

* 拒绝消息,参数说明:

* long deliveryTag:唯一标识 ID。

* boolean requeue:如果 requeue 参数设置为 true,

* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;

* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,

* 而不会把它发送给新的消费者。

*/

channel.basicReject(deliveryTag, true);

System.out.println("fanout.C收到否定确认了" + deliveryTag + "重新放入队列 ");

}

}

/**

* Fanout.c的QUEUE的第二个消费者,避免无限循环

* @param testMessage

* @param message

* @param channel

* @throws IOException

* @throws InterruptedException

*/

@RabbitHandler

@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)

public void processC2(Map testMessage, Message message, Channel channel) throws IOException, InterruptedException {

String consumerQueueName = message.getMessageProperties().getConsumerQueue();

long deliveryTag = message.getMessageProperties().getDeliveryTag();

Thread.sleep(5000);

if (FanoutRabbitConfig.FANOUT_QUEUE_C.equals(consumerQueueName)) {

channel.basicAck(deliveryTag, true);

System.out.println("fanout.C2收到keng定确认了" + deliveryTag );

}

}

}

返回的结果

fanout.a收到肯定确认了1

fanout.B收到否定确认了1未重新放入队列

fanout.C收到否定确认了1重新放入队列

fanout.C2收到keng定确认了1

processC和processC2随机消费fanout.C队列,多个消费者避免一个消费者重新放入队列造成的循环

三、实现延时队列

RabbitMQ本身是不支持延时队列的,但是延时队列的使用还是很广泛的,例如一个订单下单之后有30分钟的支付时间,30分钟后要对订单的支付状态进行判断,这时候就需要用到延时队列。

目前基于RabbitMq实现延时队列的方法有两种:

1、死信队列

特性1、Time To Live(TTL)

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

A: 通过队列属性设置,队列中所有消息都有相同的过期时间。 B: 对消息进行单独设置,每条消息TTL可以不同。 如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

特性2、Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchangex-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

队列出现dead letter的情况有:

消息或者队列的TTL过期队列达到最大长度消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上述两个特性,设置了TTL规则之后当消息在一个队列中变成死信时,利用DLX特性它能被重新转发到另一个Exchange或者Routing Key,这时候消息就可以重新被消费了。

死信队列的配置

package com.it520.bookkeeping.config;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

/**

* @Author : huliupan

* @CreateTime : 2022/10/13

* @Description :直连交换机的配置

**/

@Configuration

public class DirectRabbitConfig {

public static final String DIRECT_EXCHANGE_NAME = "TestDirectExchange";

public static final String DIRECT_QUEUE_NAME = "TestDirectQueue";

public static final String DIRECT_QUEUE_NAME1 = "TestDirectQueue1";

public static final String DIRECT_EXCHANGE_ROUTE_KEY = "TestDirectRouting";

public static final String DIRECT_DELAY_EXCHANGE_NAME = "TestDirectDelayExchange";

public static final String DIRECT_DELAY_QUEUE_NAME = "TestDirectDelayQueue";

public static final String DIRECT_DELAY_QUEUE_NAME1 = "TestDirectDelayQueue1";

public static final String DIRECT_DELAY_EXCHANGE_ROUTE_KEY = "TestDirectDelayRouting";

//队列 起名:TestDirectQueue

//声明用于失效的队列的延时失效属性,并绑定到对应的死信交换机

@Bean

public Queue TestDirectQueue() {

Map args = new HashMap<>(2);

// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机

args.put("x-dead-letter-exchange", DIRECT_DELAY_EXCHANGE_NAME);

// x-dead-letter-routing-key 这里声明当前队列的死信路由key

args.put("x-dead-letter-routing-key", DIRECT_DELAY_EXCHANGE_ROUTE_KEY);

// x-message-ttl 声明队列的TTL

args.put("x-message-ttl", 1000 * 10);

return new Queue(DIRECT_QUEUE_NAME, true, false, false, args);

}

//队列 起名:TestDirectQueue1

@Bean

public Queue TestDirectQueue1() {

Map args = new HashMap<>(2);

// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机

args.put("x-dead-letter-exchange", DIRECT_DELAY_EXCHANGE_NAME);

// x-dead-letter-routing-key 这里声明当前队列的死信路由key

args.put("x-dead-letter-routing-key", DIRECT_DELAY_EXCHANGE_ROUTE_KEY);

// x-message-ttl 声明队列的TTL

args.put("x-message-ttl", 1000 * 5);

return new Queue(DIRECT_QUEUE_NAME1, true, false, false, args);

}

//Direct交换机 起名:TestDirectExchange

@Bean

DirectExchange TestDirectExchange() {

// return new DirectExchange("TestDirectExchange",true,true);

return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);

}

//声明延时交换机

@Bean

DirectExchange createDelayExchange() {

return new DirectExchange(DIRECT_DELAY_EXCHANGE_NAME, true, false);

}

//声明死信队列

@Bean

Queue delayQueue() {

return new Queue(DIRECT_DELAY_QUEUE_NAME, true);

}

//声明死信队列

@Bean

Queue delayQueue1() {

return new Queue(DIRECT_DELAY_QUEUE_NAME1, true);

}

//延时交换机和死信队列绑定

@Bean

Binding bindingDelayQueue() {

return BindingBuilder.bind(delayQueue()).to(createDelayExchange()).with(DIRECT_DELAY_EXCHANGE_ROUTE_KEY);

}

//延时交换机和死信队列绑定

@Bean

Binding bindingDelayQueue1() {

return BindingBuilder.bind(delayQueue1()).to(createDelayExchange()).with(DIRECT_DELAY_EXCHANGE_ROUTE_KEY);

}

//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting

@Bean

Binding bindingDirect() {

return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(DIRECT_EXCHANGE_ROUTE_KEY);

}

@Bean

Binding bindingDirect1() {

return BindingBuilder.bind(TestDirectQueue1()).to(TestDirectExchange()).with(DIRECT_EXCHANGE_ROUTE_KEY);

}

@Bean

DirectExchange lonelyDirectExchange() {

return new DirectExchange("lonelyDirectExchange");

}

}

消息发送者;

@GetMapping("/sendDirectMessage")

public String sendDirectMessage() {

String messageData = "test message, hello!";

//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange

rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE_NAME, DirectRabbitConfig.DIRECT_EXCHANGE_ROUTE_KEY, messageData);

return "ok";

}

死信消息的接收者

@RabbitHandler

@RabbitListener(queues = DirectRabbitConfig.DIRECT_DELAY_QUEUE_NAME)//监听的队列名称 TestDirectDelayQueue

public void processDelay(String testMessage, Message message , Channel channel) throws IOException {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

System.out.println("死信消息properties" + message.getMessageProperties());

System.out.println("DirectReceiver Delay消费者收到消息 : " + testMessage.toString());

channel.basicAck(deliveryTag , false);

}

@RabbitHandler

@RabbitListener(queues = DirectRabbitConfig.DIRECT_DELAY_QUEUE_NAME1)//监听的队列名称 TestDirectDelayQueue1

public void processDelay1(String testMessage, Message message , Channel channel) throws IOException {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

System.out.println("死信消息properties" + message.getMessageProperties());

System.out.println("DirectReceiver Delay1消费者收到消息 : " + testMessage.toString());

channel.basicAck(deliveryTag , false);

}

2、RabbitMq的插件

插件实现参考:https://blog.csdn.net/u014308482/article/details/53036770

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: