1.为什么会用到延时队列?

场景: 最近在开发一款系统中遇到这样一个场景,A系统开通套餐需要把套餐信息以邮件的形式发送给相关工作人员,经过人工审核通过后,在B系统里面开通,A系统会调B系统套餐列表接口查询套餐是否开通成功,开通成功则从A系统去完成订单,假如超过设定时间未开通成功,则关闭订单并退费. (这东西俗称"套娃") 这时候用RabbitMQ的延时队列就可以完美的解决这个问题

2.为什么会提到多策略?

场景: 假如A系统还有别的功能添加需要经过人工审核之后在B系统中添加成功之后,A系统才会显示添加成功,但是又不想写很多队列啊消费者等代码.就可以用到这种策略模式,换句话说 就是类似 if… else …能明白了吧.

3.进入今天主题

整体流程图:

生产者生产一条延时消息,根据需要延时时间的不同,利用routingkey将消息路由到延时队列,队列都设置了TTL属性,并绑定到死信交换机中,消息过期后,根据routingkey又会被路由到死信队列中,消费者只需要监听死信队列,拿到消息去具体的策略实现类进行后续业务处理即可。

有了这个图写代码就简单了. mq配置类 声明队列,路由键,交换机之间的关系;以及生产者消费者 rabbitmq等Bean

RabbitMqConfig

注意 监听我也写在配置类里面SimpleMessageListenerContainer用的这个类去设置的队列 simpleMessageListenerContainer.setQueueNames(DEAD_LETTER_QUEUE_NAME);

package com.king.alice.rabbitmq.config;

import com.king.alice.rabbitmq.delay.consumer.MessageConsumer;

import com.king.alice.rabbitmq.delay.consumer.Strategy;

import com.king.alice.rabbitmq.delay.provider.MessageProvider;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.beans.factory.ObjectProvider;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* @Author wlt

* @Description rabbitmq配置类

* @Date 2022/9/4

* @Param

* @return

**/

@Configuration

public class RabbitMqConfig {

public static final String DELAY_EXCHANGE_NAME = "delay.alice.exchange";

public static final String DELAY_QUEUE_NAME = "delay.alice.queue";

public static final String DELAY_QUEUE_ROUTING_KEY = "delay.alice.queue.routing.key";

public static final String DEAD_LETTER_EXCHANGE = "ttl.alice.exchange";

public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "ttl.alice.queue.routing.key";

public static final String DEAD_LETTER_QUEUE_NAME = "ttl.alice.queue";

/**

* 声明延时Exchange

* @return

*/

@Bean("delayExchange")

public DirectExchange delayExchange(){

return new DirectExchange(DELAY_EXCHANGE_NAME);

}

/**

* 功能描述:

* <声明死信Exchange>

*/

@Bean("deadLetterExchange")

public DirectExchange deadLetterExchange(){

return new DirectExchange(DEAD_LETTER_EXCHANGE);

}

/**

* 声明延时队列 并绑定到对应的死信交换机

* @return

*/

@Bean("delayQueue")

public Queue delayQueue(){

Map args = new HashMap<>(2);

// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机

args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

// x-dead-letter-routing-key 这里声明当前队列的死信路由key

args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);

// x-message-ttl 声明队列的TTL

// args.put("x-message-ttl", 6000);

return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();

}

/**

* 功能描述:

* <声明死信队列用于接收延时处理的消息>

*/

@Bean("deadLetterQueue")

public Queue deadLetterQueue(){

return new Queue(DEAD_LETTER_QUEUE_NAME);

}

/**

* 功能描述:

* <声明延时队列绑定关系>

* @Param:

* @Return:

* @Author: 大魔王

* @Date: 2023/8/15 20:00

*/

@Bean

public Binding delayBinding(@Qualifier("delayQueue") Queue queue,

@Qualifier("delayExchange") DirectExchange exchange){

return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);

}

/**

* 功能描述:

* <声明死信队列A绑定关系>

* @Param:

* @Return:

* @Author: 大魔王

* @Date: 2023/8/15 20:01

*/

@Bean

public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,

@Qualifier("deadLetterExchange") DirectExchange exchange){

return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);

}

@Bean

@ConditionalOnMissingBean

public MessageProvider messageProvider(@Qualifier("delayRabbitTemplate") RabbitTemplate template) {

return new MessageProvider(template);

}

@Bean

@ConditionalOnMissingBean

public MessageConsumer messageConsumer(ObjectProvider> provider) {

return new MessageConsumer(provider);

}

@Bean

@ConditionalOnMissingBean

public RabbitTemplate delayRabbitTemplate(ConnectionFactory connectionFactory) {

RabbitTemplate template = new RabbitTemplate(connectionFactory);

template.setMessageConverter(new Jackson2JsonMessageConverter());

return template;

}

@Bean

SimpleMessageListenerContainer simpleMessageListenerContainer(MessageConsumer messageConsumer, ConnectionFactory factory) {

SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(factory);

simpleMessageListenerContainer.setQueueNames(DEAD_LETTER_QUEUE_NAME);

simpleMessageListenerContainer.setExposeListenerChannel(true);

simpleMessageListenerContainer.setMessageListener(messageConsumer);

return simpleMessageListenerContainer;

}

public static final String EXCHANGE_NAME = "alice_topic_exchange";

public static final String QUEUE_NAME = "alice_queue";

@Bean("aliceExchange")

public Exchange aliceExchange() {

return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();

}

@Bean("aliceQueue")

public Queue aliceQueue() {

return QueueBuilder.durable(QUEUE_NAME).build();

}

@Bean

public Binding bindQueueExchange(@Qualifier("aliceQueue") Queue queue, @Qualifier("aliceExchange") Exchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("alice.#").noargs();

}

}

生产者:

MessageProvider

package com.king.alice.rabbitmq.delay.provider;

import cn.hutool.core.date.DateUtil;

import com.king.alice.common.json.JSON;

import com.king.alice.rabbitmq.config.RabbitMqConfig;

import com.king.alice.rabbitmq.delay.bean.DelayMessage;

import lombok.extern.slf4j.Slf4j;

import org.junit.Assert;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.Date;

/**

* @author 大魔王

*/

@Slf4j

@Component

public class MessageProvider {

@Autowired

private final RabbitTemplate rabbitTemplate;

public MessageProvider(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

}

/**

* send delay message

*/

public void sendMessage(DelayMessage delayMessage) {

Assert.assertNotNull(delayMessage);

log.info(" now date {},delay {} seconds to write to the message queue", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), delayMessage.getDelay());

rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY, delayMessage,

message -> {

message.getMessageProperties().setExpiration(String.valueOf(delayMessage.getDelay() * 1000));

return message;

});

}

}

消费者:

package com.king.alice.rabbitmq.delay.consumer;

import cn.hutool.core.util.ObjectUtil;

import com.king.alice.common.json.JSONObject;

import com.king.alice.rabbitmq.delay.bean.AliceMessage;

import com.king.alice.rabbitmq.delay.bean.DelayMessage;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.beans.factory.ObjectProvider;

import org.springframework.stereotype.Component;

import org.springframework.util.CollectionUtils;

import java.lang.reflect.ParameterizedType;

import java.lang.reflect.Type;

import java.util.List;

import java.util.Map;

import java.util.Optional;

import java.util.concurrent.ConcurrentHashMap;

import java.util.stream.Collectors;

/**

* @author 大魔王

*/

@Slf4j

public class MessageConsumer implements MessageListener {

private final Map> strategyMap = new ConcurrentHashMap<>();

public MessageConsumer(ObjectProvider> stategyProvider) {

List handleList = stategyProvider.getIfAvailable();

Optional> optionalStrategies = Optional.ofNullable(handleList);

optionalStrategies.ifPresent(strategies -> strategies.stream().filter(strategy -> {

Type genericInterface = strategy.getClass().getGenericInterfaces()[0];

return genericInterface instanceof ParameterizedType;

}).map(strategy -> ((ParameterizedType) strategy.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0])

.collect(Collectors.toSet()).forEach(delayMessages -> {

List collect = strategies.stream().filter(strategy -> {

Type genericInterface = strategy.getClass().getGenericInterfaces()[0];

if (genericInterface instanceof ParameterizedType) {

Type actualTypeArgument = ((ParameterizedType) genericInterface).getActualTypeArguments()[0];

return delayMessages.getTypeName().equals(actualTypeArgument.getTypeName());

}

return false;

}).collect(Collectors.toList());

strategyMap.put(delayMessages, collect);

}));

}

@Override

public void onMessage(Message message) {

MessageConverter messageConverter = new Jackson2JsonMessageConverter();

DelayMessage delayMessage = (DelayMessage) messageConverter.fromMessage(message);

List strategyList = strategyMap.get(delayMessage.getClass());

if (!CollectionUtils.isEmpty(strategyList)) {

strategyList.forEach(strategy -> strategy.handle(delayMessage));

} else {

log.info("Missing message processing class");

}

}

}

策略相关Bean,接口以及实现类

DelayMessage

package com.king.alice.rabbitmq.delay.bean;

/**

* @author 大魔王

*/

public interface DelayMessage{

/**

* 获得延迟时间(单位秒)

*

* @return 延迟时间单位秒

*/

int getDelay();

}

AliceMessage

package com.king.alice.rabbitmq.delay.bean;

import lombok.Getter;

import lombok.Setter;

/**

* @author 大魔王

*/

@Getter

@Setter

public class AliceMessage implements DelayMessage {

/**

* 用户邮箱

*/

String email;

/**

* 订单类型

*/

String orderType;

/**

* 执行次数

*/

Integer dealCount;

/**

* 延时秒数

*/

int delay;

@Override

public int getDelay() {

return this.delay;

}

public void setDelay(int delay) {

this.delay = delay;

}

}

UserMessage

package com.king.alice.rabbitmq.delay.bean;

import lombok.Getter;

import lombok.Setter;

/**

* @author 大魔王

*/

@Getter

@Setter

public class UserMessage implements DelayMessage{

/**

* 用户

*/

String username;

/**

* token

*/

String token;

/**

* 执行次数

*/

Integer dealCount;

/**

* 延时秒数

*/

int delay;

@Override

public int getDelay() {

return this.delay;

}

public void setDelay(int delay) {

this.delay = delay;

}

}

Strategy

package com.king.alice.rabbitmq.delay.consumer;

import com.king.alice.rabbitmq.delay.bean.DelayMessage;

/**

* @author 大魔王

*/

public interface Strategy {

/**

* 处理消息的方法

*

* @param delayMessage 收到的消息

*/

void handle(T delayMessage);

}

AliceMessageHandler

package com.king.alice.rabbitmq;

import com.king.alice.common.json.JSON;

import com.king.alice.common.json.JSONObject;

import com.king.alice.rabbitmq.delay.bean.AliceMessage;

import com.king.alice.rabbitmq.delay.consumer.Strategy;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

/**

* @author 大魔王

*/

@Component

@Slf4j

public class AliceMessageHandler implements Strategy {

@Override

public void handle(AliceMessage delayMessage) {

log.info("AliceMessage响应体{}", JSONObject.parseObject(JSON.toJSONString(delayMessage)));

}

}

UserMessageHandler

package com.king.alice.rabbitmq;

import com.king.alice.common.json.JSON;

import com.king.alice.common.json.JSONObject;

import com.king.alice.rabbitmq.delay.bean.UserMessage;

import com.king.alice.rabbitmq.delay.consumer.Strategy;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

/**

* @author 大魔王

*/

@Slf4j

@Component

public class UserMessageHandler implements Strategy {

@Override

public void handle(UserMessage delayMessage) {

log.info("UserMessage响应体{}", JSONObject.parseObject(JSON.toJSONString(delayMessage)));

}

}

接下来 我们写个controller测试一下

SysAccountController

package com.king.alice.manage.sys.controller;

import cn.hutool.core.date.DateUtil;

import com.king.alice.manage.sys.entity.SysAccount;

import com.king.alice.manage.sys.service.SysAccountService;

import com.king.alice.rabbitmq.delay.bean.AliceMessage;

import com.king.alice.rabbitmq.delay.bean.UserMessage;

import com.king.alice.rabbitmq.delay.provider.MessageProvider;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.domain.Page;

import org.springframework.data.domain.PageRequest;

import org.springframework.http.ResponseEntity;

import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

import java.util.Date;

/**

* 账号表(SysAccount)表控制层

*

* @author makejava

* @since 2023-08-09 11:40:16

*/

@RestController

@Slf4j

public class SysAccountController {

/**

* 服务对象

*/

@Resource

private SysAccountService sysAccountService;

@Autowired

private MessageProvider messageProvider;

@PostMapping("/send-alice-message")

public String sendMsg(@RequestBody AliceMessage aliceMessage) {

messageProvider.sendMessage(aliceMessage);

log.info("当前时间:{},收到aliceMessage请求,msg:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), aliceMessage);

return "success";

}

@PostMapping("/send-user-message")

public String sendMsg(@RequestBody UserMessage userMessage) {

messageProvider.sendMessage(userMessage);

log.info("当前时间:{},收到userMessage请求,msg:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), userMessage);

return "success";

}

}

调接口

第一个策略: 控制台打印 第二个策略: 延时12秒成功接收到消息

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: