配置文件

server.port=8081

logging.level.com.chensir=debug

#host

spring.rabbitmq.host=121.40.100.66

#默认5672

spring.rabbitmq.port=5672

#用户名

spring.rabbitmq.username=guest

#密码

spring.rabbitmq.password=guest

#连接到代理时用的虚拟主机

spring.rabbitmq.virtual-host=/

#每个消费者每次可最大处理的nack消息数量 默认是250个 可在服务界面看到;意思是每批投递的数量

spring.rabbitmq.listener.simple.prefetch=1

#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto

spring.rabbitmq.listener.simple.acknowledge-mode=auto

#spring.rabbitmq.listener.simple.acknowledge-mode=manual

#是否开启自动重试 默认为false 不开启

spring.rabbitmq.listener.simple.retry.enabled=true

#最大重试次数

spring.rabbitmq.listener.simple.retry.max-attempts=5

#最大重试时间间隔

spring.rabbitmq.listener.simple.retry.max-interval=20000ms

#重试时间间隔

spring.rabbitmq.listener.simple.retry.initial-interval=2000ms

# 最大重试间隔*乘数

#应用于上一重试间隔的乘数 第一次(重试时间间隔)2s 4s 8s 16s 32s 此处32s>20s 以后都以20s为间隔 总的次数为最大重试次数

spring.rabbitmq.listener.simple.retry.multiplier=2

#尝试次数超过上面的设置之后是否丢弃;默认是true(false不丢弃时需要写相应代码将消息加入死信队列中;与参数acknowledge-mode有关系)

spring.rabbitmq.listener.simple.default-requeue-rejected=false

# 下方配置为生产者确认机制的配置

#mandatory为true开启强制消息投递,若消息未被路由至任何一个queue则回退消息到RabbitTemplate.ReturnCallback中的returnedMessage方法;

spring.rabbitmq.template.mandatory = true

#开启回调机制,交换机发布确认模式;交换机收到或者未收到消息,都会调用回调实现类的回调确认方法;

spring.rabbitmq.publisher-confirm-type=correlated

# 开启回退消息,当交换机无法将消息路由出去,便会将消息回退给生产者

spring.rabbitmq.publisher-returns=true

生产者代码

package com.chensir.provider;

import cn.hutool.json.JSONUtil;

import com.chensir.config.RabbitConfig;

import com.chensir.model.User;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.util.UUID;

/**

* 生产者

*/

@Component

@Slf4j

public class DirectProviderOk implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

//@PostConstruct 注解是专门数据初始化的注解, 只有其他组件注入后,初始化方法才会执行,

@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

public void sendOk() {

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

System.out.println();

System.out.println("callbackSender UUID: " + correlationData.getId());

User user = User.builder()

.id(1)

.sex("男")

.name("chen")

.build();

String s = JSONUtil.toJsonStr(user);

rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_KEY, RabbitConfig.ROUTING_KEY_03 + 1, s,

m -> {

return m;

}, correlationData);

}

/**

* 交换机确认回调方法

*

* correlationData 保存回调消息的ID及相关信息

* ack 表示交换机是否收到消息

* cause 失败的原因, 成功为null

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

String id = correlationData != null ? correlationData.getId() : "";

if (ack) {

log.info("交换机已经收到了ID为:{}的消息", id);

} else {

log.info("交换机还未收到ID为:{}的消息,由于原因:{}", id, cause);

}

}

/**

* 消息没被交换机成功投递到队列时回调

*

* 可以在当消息传递过程中不可达目的地时将消息返回给生产者,只有不可达目的地的时候 才进行回退,也就是回调

*

* @param returnedMessage

*/

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

log.info("消息{},被交换机:{}退回,原因是:{},路由key是:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());

}

}

结果

未成功投递交换机回调:

callbackSender UUID: 59ea9e71-41b6-4ee7-854d-b41820b47290

2023-08-29 18:52:06.655 ERROR 26064 --- [.40.100.52:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'DirectExchange-011' in vhost '/', class-id=60, method-id=40)

2023-08-29 18:52:06.657 INFO 26064 --- [nectionFactory2] com.chensir.provider.DirectProviderOk : 交换机还未收到ID为:59ea9e71-41b6-4ee7-854d-b41820b47290的消息,由于原因:channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'DirectExchange-011' in vhost '/', class-id=60, method-id=40)

交换机未成功投递队列回调:

callbackSender UUID: d989577a-3fc7-43f0-bc4b-337f8c0bf556

2023-08-29 18:53:14.217 INFO 9324 --- [nectionFactory1] com.chensir.provider.DirectProviderOk : 消息{"id":1,"name":"chen","sex":"男"},被交换机:DirectExchange-01退回,原因是:312,路由key是:NO_ROUTE

2023-08-29 18:53:14.219 INFO 9324 --- [nectionFactory2] com.chensir.provider.DirectProviderOk : 交换机已经收到了ID为:d989577a-3fc7-43f0-bc4b-337f8c0bf556的消息

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: