消费者默认是自动确认的,这是及其不安全的。一般情况下我们都需要手动确认去保证数据的安全性。

消费者确认方式

1.basicAck

basicAck方法是肯定的交付,一般在该消息处理完后执行,该消息才会在队列里面被删除,不然会处于UnAcked的状态存在队列中。

其方法有两个参数:

参数1:消费消息的index

参数2: 是否批量确认消息,前提是在同一个channel里面,且是在该消息确认前没有被确认的消息才能批量确认。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

2.basicReject

basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息

其方法有两个参数:

参数1: 消费消息的index

参数2: 对异常消息的处理,true表示重排序,false表示丢弃

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

3.basicNack

basicNack也是否定的交付,其功能和basicReject是一样的。区别是basicNack比basicReject的功能更强一些。他能够一次丢弃多个或重排序多个消息

其方法有三个参数:

参数1:消费消息的index

参数2:是否批量否定多个消息,设为false就与basicReject功能一样,triue的前提也是在同一个channel,且在该消息否定前存在未确认的消息

参数3: 对异常消息的处理,true表示重排序,false表示丢弃

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);

案例

一.配置文件

#消费者每次消费一个消息

spring.rabbitmq.listener.simple.prefetch=1

#消费者手动答应

spring.rabbitmq.listener.simple.acknowledge-mode=manual

二.创建队列

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMqConfig {

@Bean

public Queue beanHello() {

return new Queue("beanHello", true, false, false, null);

}

}

三.生产者

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageProperties;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

import java.util.UUID;

@RestController

public class MyRabbitmqController {

@Resource

private RabbitTemplate rabbitTemplate;

private Logger logger = LoggerFactory.getLogger(MyRabbitmqController.class);

@GetMapping("beanHello")

public void hello(String mess){

MessageProperties messageProperties = new MessageProperties();

//设置消息唯一ID

messageProperties.setMessageId(UUID.randomUUID().toString());

Message message = new Message(mess.getBytes(),messageProperties);

logger.info("生产者 消息id:{} ",messageProperties.getMessageId());

rabbitTemplate.convertAndSend("beanHello",message);

}

}

四.消费者

import com.rabbitmq.client.Channel;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

/**

* @author jitwxs

* @date 2023年02月06日 11:56

*/

@Component

public class MyRabbitListener {

private Logger logger = LoggerFactory.getLogger(MyRabbitmqController.class);

@RabbitListener(queuesToDeclare = @Queue("beanHello"))

public void hello(Message message, Channel channel) throws IOException {

String messageId = message.getMessageProperties().getMessageId();

String s = new String(message.getBody(), StandardCharsets.UTF_8);

logger.info("消费者 消息id:{},消息为:{}",messageId, s);

try {

/*

* 业务代码,

* int n = 0/0 模拟异常

*

* */

int n = 0 / 0;

            //确认消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), fale);

logger.info("ok");

} catch (Exception e) {

            //退回消息,将消息重新放回队列

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

logger.info("error");

}

}

}

五.测试

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: