目录

一、消息的可靠投递1.提供者代码实现2.消息的可靠投递小结3.Consumer Ack

一、消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

confirm 确认模式return 退回模式

rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer

消息从 producer 到 exchange 则会返回一个 confirmCallback 。消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

1.提供者代码实现

1.创建项目 mq1(提供者) 2.pom

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

org.springframework.boot

spring-boot-starter-parent

2.3.6.RELEASE

4.0.0

mq1

15

15

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-test

3.在resource 文件夹下面添加 配置文件 rabbitmq.yml

spring:

rabbitmq:

host: 192.168.121.140

port: 5672

username: admin

password: admin

virtual-host: /

publisher-confirm-type: correlated

publisher-returns: true

4.队列和交互机绑定

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMQConfig {

public static final String EXCHANGE_NAME = "boot_topic_exchange2";

public static final String QUEUE_NAME = "boot_queue66";

// 1 交换机

@Bean("bootExchange")

public Exchange bootExchange(){

return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();

}

//2.Queue 队列

@Bean("bootQueue")

public Queue bootQueue(){

return QueueBuilder.durable(QUEUE_NAME).build();

}

//3. 队列和交互机绑定关系 Binding

/*

1. 知道哪个队列

2. 知道哪个交换机

3. routing key

noargs():表示不指定参数

*/

@Bean

public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,

@Qualifier("bootExchange") Exchange exchange){

return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();

}

}

5.创建入口类

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class ProducerApplication {

public static void main(String[] args) {

SpringApplication.run(ProducerApplication.class);

}

}

确认模式

import com.donglin.test.RabbitMQConfig;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest

@RunWith(SpringRunner.class)

public class ProducerTest {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 确认模式:

* 步骤:

* 1. 确认模式开启:yml中publisher-confirm-type: correlated

* 2. 在rabbitTemplate定义ConfirmCallBack回调函数

*/

@Test

public void testConfirm() {

//2. 定义回调

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if (ack){

//接收成功

System.out.println("接收成功消息");

}else {

//接收失败

System.out.println("接收失败消息" + cause);

//做一些处理,让消息再次发送。

}

}

});

//3. 发送消息

rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "confirm", "消息");//成功

//rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm....");//失败

}

}

退回模式

import com.donglin.test.RabbitMQConfig;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest

@RunWith(SpringRunner.class)

public class ProducerTest {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack

* 步骤:

* 1. 开启回退模式:publisher-returns="true"

* 2. 设置ReturnCallBack

* 3. 设置Exchange处理消息的模式:

* 1). 如果消息没有路由到Queue,则丢弃消息(默认)

* 2). 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack

* rabbitTemplate.setMandatory(true);

*/

@Test

public void testReturn() {

//设置交换机处理失败消息的模式

rabbitTemplate.setMandatory(true);

//2.设置ReturnCallBack

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

/**

* @param message 消息对象

* @param replyCode 错误码

* @param replyText 错误信息

* @param exchange 交换机

* @param routingKey 路由键

*/

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

System.out.println("return 执行了....");

System.out.println(message);

System.out.println(replyCode);

System.out.println(replyText);

System.out.println(exchange);

System.out.println(routingKey);

//处理

}

});

//3. 发送消息

// rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "confirm", "message confirm....");

rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "confirm111", "message confirm....");

}

}

2.消息的可靠投递小结

设置开启确认模式

spring:

rabbitmq:

publisher-confirm-type: correlated

使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。设置退回模式。

spring:

rabbitmq:

publisher-returns: true

使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage

3.Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。 有二种确认方式: 自动确认:默认 手动确认:

spring:

rabbitmq:

acknowledge-mode: manual

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1.创建项目 mq2(消费者) 2.pom

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

org.springframework.boot

spring-boot-starter-parent

2.3.6.RELEASE

4.0.0

mq2

15

15

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-test

3.在 resource 文件夹下面新建 application.yml文件 记得是simple,使用direct要报错

spring:

rabbitmq:

host: 192.168.121.140

port: 5672

username: admin

password: admin

virtual-host: /

listener:

simple:

#表示手动确认

acknowledge-mode: manual

4.启动类

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class ProducerApplication {

public static void main(String[] args) {

SpringApplication.run(ProducerApplication.class);

}

}

自动确认

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

import org.springframework.stereotype.Component;

@Component

public class AckListener implements MessageListener {

@RabbitListener(queues = "boot_queue66")

@Override

public void onMessage(Message message) {

System.out.println(new String(message.getBody()));

}

}

手动确认 添加监听器

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

* Consumer ACK机制:

* 1. 设置手动签收。acknowledge="manual"

* 2. 让监听器类实现ChannelAwareMessageListener接口

* 3. 如果消息成功处理,则调用channel的 basicAck()签收

* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer

*/

@Component

public class AckListener1 implements ChannelAwareMessageListener {

@RabbitListener(queues = "boot_queue66")

@Override

public void onMessage(Message message, Channel channel) throws Exception {

Thread.sleep(1000);

// 获取消息传递标记

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

// ① 接收消息

System.out.println(new String(message.getBody()));

// ② 处理业务逻辑

System.out.println("处理业务逻辑");

int i = 3/0;//出现错误

// ③ 手动签收

/**

* 第一个参数:表示收到的标签

* 第二个参数:如果为true表示可以签收所有的消息

*/

channel.basicAck(deliveryTag,true);

} catch (Exception e) {

e.printStackTrace();

// ④ 拒绝签收

/*

第三个参数:requeue:重回队列。

设置为true,则消息重新回到queue,broker会重新发送该消息给消费端

*/

channel.basicNack(deliveryTag,true,true);

}

}

}

添加测试类

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest

@RunWith(SpringRunner.class)

public class ConsumerTest {

@Test

public void test(){

while (true){

}

}

}

Consumer Ack 小结 none:自动确认,manual:手动确认 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,true);方法确认签收消息 如果出现异常,则在catch中调用 basicNack,拒绝消息,让MQ重新发送消息。

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: