为什么要有延迟队列? 延迟消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。 使用场景: 短信通知:下单成功后60s之后给用户发送短信通知。 失败重试:业务操作失败后,间隔一定的时间进行失败重试。 实现方式 1:Time To Live(TTL)、Dead Letter Exchanges(DLX) RabbitMQ 提供了过期时间 TTL 机制,可以设置消息在队列中的存活时长。在消息到达过期时间时,会从当前队列中删除,并被 RabbitMQ 自动转发到对应的死信队列中。 然后再来消费该死信队列,这样就可以实现一个延迟队列的效果 2:利用 RabbitMQ 中的插件 x-delay-message 以下为实现过程 一:使用TTL的方式实现

===========================》配置类

@Configuration

public class DirectExchangeConfiguration {

/**

* 延迟队列

*

* @return Queue

*/

@Bean

public Queue queueDelay11() {

// Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除

return new Queue(

Message11.QUEUE_DELAY,

true,

false,

false);

}

/**

* 队列,绑定过期时间等

*

* @return Queue

*/

@Bean

public Queue queue11() {

return QueueBuilder

// durable: 是否持久化

.durable(Message11.QUEUE)

// exclusive: 是否排它

.exclusive()

// autoDelete: 是否自动删除

.autoDelete()

// TTL 设置队列里的默认过期时间为 10 秒

.ttl(10 * 1000)

// DLX

.deadLetterExchange(Message11.EXCHANGE)

.deadLetterRoutingKey(Message11.ROUTING_KEY_DELAY)

.build();

}

@Bean

public DirectExchange exchange11() {

// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它

return new DirectExchange(Message11.EXCHANGE,

true,

false);

}

/**

* 创建 Binding

* Exchange:Message11.EXCHANGE

* Routing key:Message11.ROUTING_KEY

* Queue:Message11.QUEUE

*

* @return Binding

*/

@Bean

public Binding binding11() {

return BindingBuilder

.bind(queue11()).to(exchange11())

.with(Message11.ROUTING_KEY);

}

/**

* 绑定延迟队列

*

* @return Binding

*/

@Bean

public Binding bindingDelay11() {

return BindingBuilder

.bind(queueDelay11()).to(exchange11())

.with(Message11.ROUTING_KEY_DELAY);

}

======================》消息实体

@Data

public class Message11 implements Serializable {

/**

* 普通队列

*/

public static final String QUEUE = "QUEUE_11";

/**

* 延迟队列

*/

public static final String QUEUE_DELAY = "QUEUE_DELAY_11";

public static final String EXCHANGE = "EXCHANGE_11";

public static final String ROUTING_KEY = "ROUTING_KEY_11";

public static final String ROUTING_KEY_DELAY = "ROUTING_KEY_DELAY_11";

private String id;

}

=================================》生产者代码

@Component

public class Producer11 {

@Resource

private RabbitTemplate rabbitTemplate;

public void syncSend(String id, int delay) {

Message11 message = new Message11();

message.setId(id);

MessagePostProcessor postProcessor = new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 设置消息的 TTL 过期时间

if (delay > 0) {

message.getMessageProperties().setExpiration(String.valueOf(delay));

}

return message;

}

};

rabbitTemplate.convertAndSend(Message11.EXCHANGE, Message11.ROUTING_KEY, message, postProcessor);

}

}

========================》消费者

@Component

@RabbitListener(queues = Message11.QUEUE_DELAY)

@Slf4j

public class Consumer11 {

@RabbitHandler

public void onMessage(Message11 message) {

log.info("[{}][Consumer11 onMessage][消息内容:{}]", LocalDateTime.now(), message);

}

}

@Test

void syncSend() {

String id = UUID.randomUUID().toString();

int delay = 5000;

producer11.syncSend(id, delay);

log.info("[{}][test producer11 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay, id);

String id2 = UUID.randomUUID().toString();

int delay2 = 2000;

producer11.syncSend(id2, delay2);

log.info("[{}][test producer11 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay2, id2);

// 其实采用 ttl 这种方式会有一个问题,就是当一个队列中有多个不一样的过期时间的消息的时候,会形成阻塞,只有前一个被消费了才会轮到后一个

// 比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞

new CountDownLatch(1).await();

}

二:使用插件实现 此种方式需要安装mq的延迟第一列插件 安装方式如下连接使用docker安装rabbitMQ的延迟第一列插件

=============================》插件方式配置类

@Configuration

public class PluginsExchangeConfiguration {

@Bean

public Queue queue12() {

// Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除

return new Queue(

Message12.QUEUE,

true,

false,

false);

}

/**

* 创建一个延迟交换机 注意类型为 “x-delayed-message”

*

* @return 交换机

*/

@Bean

public CustomExchange exchange12() {

Map args = new HashMap<>(1);

args.put("x-delayed-type", "direct");

return new CustomExchange(Message12.EXCHANGE, "x-delayed-message", true, false, args);

}

@Bean

public Binding binding12() {

return BindingBuilder

.bind(queue12()).to(exchange12())

.with(Message12.ROUTING_KEY)

.noargs();

}

}

======================================》生产者

@Component

public class Producer12 {

@Resource

private RabbitTemplate rabbitTemplate;

public void syncSend(String id, int delay) {

Message12 message = new Message12();

message.setId(id);

MessagePostProcessor postProcessor = new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 设置过期时间

message.getMessageProperties().setHeader("x-delay", delay);

return message;

}

};

rabbitTemplate.convertAndSend(Message12.EXCHANGE, Message12.ROUTING_KEY, message, postProcessor);

}

}

=============================》消费者

@Component

@RabbitListener(queues = Message12.QUEUE)

@Slf4j

public class Consumer12 {

@RabbitHandler

public void onMessage(Message12 message) {

log.info("[{}][Consumer12 onMessage][消息内容:{}]", LocalDateTime.now(), message);

}

}

@Test

void syncSend() {

String id = UUID.randomUUID().toString();

int delay = 5000;

producer12.syncSend(id, delay);

log.info("[{}][test producer12 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay, id);

String id2 = UUID.randomUUID().toString();

int delay2 = 2000;

producer12.syncSend(id2, delay2);

log.info("[{}][test producer12 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay2, id2);

// 其实采用 ttl 这种方式会有一个问题,就是当一个队列中有多个不一样的过期时间的消息的时候,会形成阻塞,只有前一个被消费了才会轮到后一个

// 比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞

new CountDownLatch(1).await();

}

以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go 若需完整代码 可识别二维码后 给您发代码。

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: