SpringBoot 中 使用RabbtiMq 

如图使用redisTemplate 一样的简单方便

模拟发送邮件的情况

pom.xml  

org.springframework.boot

spring-boot-starter-amqp

org.springframework.amqp

spring-rabbit-test

test

application.properties

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

spring.rabbitmq.host=192.168.91.128

spring.rabbitmq.port=5672

## 根据自己情况而定,可以不用

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring.rabbitmq.listener.simple.prefetch=100

写在配置文件中,由 RabbitProperties 这个类进行读取,封装到ConnectionFactory 中。

MailConstants (常量)

public class MailConstants {

public static final Integer DELIVERING = 0;//消息投递中

public static final Integer SUCCESS = 1;//消息投递成功

public static final Integer FAILURE = 2;//消息投递失败

public static final Integer MAX_TRY_COUNT = 3;//最大重试次数

public static final Integer MSG_TIMEOUT = 1;//消息超时时间

public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue";

public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange";

public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key";

}

RabbitConfig (rabbitMq的配置类)

import org.javaboy.vhr.model.MailConstants;

import org.javaboy.vhr.service.MailSendLogService;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitConfig {

public final static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

@Autowired

CachingConnectionFactory cachingConnectionFactory;

//发送邮件的

@Autowired

MailSendLogService mailSendLogService;

@Bean

RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);

//手动应答返回的标志

rabbitTemplate.setConfirmCallback((data, ack, cause) -> {

String msgId = data.getId();

if (ack) {

logger.info(msgId + ":消息发送成功");

mailSendLogService.updateMailSendLogStatus(msgId, 1);//修改数据库中的记录,消息投递成功

} else {

logger.info(msgId + ":消息发送失败");

}

});

rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> {

logger.info("消息发送失败");

});

return rabbitTemplate;

}

@Bean

Queue mailQueue() {

return new Queue(MailConstants.MAIL_QUEUE_NAME, true);

}

@Bean

DirectExchange mailExchange() {

return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME, true, false);

}

@Bean

Binding mailBinding() {

return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);

}

}

MailSendTask(定时任务,发送)

@Component

public class MailSendTask {

@Autowired

MailSendLogService mailSendLogService;

@Autowired

RabbitTemplate rabbitTemplate;

@Autowired

EmployeeService employeeService;

@Scheduled(cron = "0/10 * * * * ?")

public void mailResendTask() {

List logs = mailSendLogService.getMailSendLogsByStatus();

if (logs == null || logs.size() == 0) {

return;

}

logs.forEach(mailSendLog->{

if (mailSendLog.getCount() >= 3) {

mailSendLogService.updateMailSendLogStatus(mailSendLog.getMsgId(), 2);//直接设置该条消息发送失败

}else{

mailSendLogService.updateCount(mailSendLog.getMsgId(), new Date());

Employee emp = employeeService.getEmployeeById(mailSendLog.getEmpId());

/**

* 参数1:交换机名称

* 参数2 :路由key

* 参数三:数据

* 参数4:作为唯一标识

*

*/

rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailSendLog.getMsgId()));

}

});

}

}

MailReceiver(接收端)

@Component

public class MailReceiver {

public static final Logger logger = LoggerFactory.getLogger(MailReceiver.class);

@Autowired

JavaMailSender javaMailSender;

@Autowired

MailProperties mailProperties;

@Autowired

TemplateEngine templateEngine;

@Autowired

StringRedisTemplate redisTemplate;

@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)

public void handler(Message message, Channel channel) throws IOException {

Employee employee = (Employee) message.getPayload();

MessageHeaders headers = message.getHeaders();

Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

String msgId = (String) headers.get("spring_returned_message_correlation");

if (redisTemplate.opsForHash().entries("mail_log").containsKey(msgId)) {

//redis 中包含该 key,说明该消息已经被消费过

logger.info(msgId + ":消息已经被消费");

channel.basicAck(tag, false);//确认消息已消费

return;

}

//收到消息,发送邮件

MimeMessage msg = javaMailSender.createMimeMessage();

MimeMessageHelper helper = new MimeMessageHelper(msg);

try {

helper.setTo(employee.getEmail());

helper.setFrom(mailProperties.getUsername());

helper.setSubject("入职欢迎");

helper.setSentDate(new Date());

Context context = new Context();

context.setVariable("name", employee.getName());

context.setVariable("posName", employee.getPosition().getName());

context.setVariable("joblevelName", employee.getJobLevel().getName());

context.setVariable("departmentName", employee.getDepartment().getName());

//根据模板发送

String mail = templateEngine.process("mail", context);

helper.setText(mail, true);

javaMailSender.send(msg);

redisTemplate.opsForHash().put("mail_log", msgId, "javaboy");

channel.basicAck(tag, false);

logger.info(msgId + ":邮件发送成功");

} catch (MessagingException e) {

//手动应答, tag 消息id ,、

channel.basicNack(tag, false, true);

e.printStackTrace();

logger.error("邮件发送失败:" + e.getMessage());

}

}

}

使用总结

0. rabbtMq的本地服务,得开启。(跟redis差不多)

1. 写 application.properties中的rabbitMq的连接配置等

2. rabbitConfig配置文件。(包括:交换机选择与队列的配置,绑定),选择的模式在这里配置

3. 直接使用,导入rabbitTemplate类,使用rabbitTemplate.convertAndSend()方法

4. 接收类

@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)  public void handler(Message message, Channel channel) throws IOException {

        业务逻辑了

        手动接收等等

}

相关文章:

1. rabbitMq基础结构图

2. channel接口常用方法

3. rabbitTemplate模板

4. rabbitMq的笔记1

5. rabbitMq笔记2

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: