基于API的方式

        1.使用AmqpAdmin定制消息发送组件

@Autowired

private AmqpAdmin amqpAdmin;

@Test

public void amqpAdmin(){

//1.定义fanout类型的交换器

amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));

//2.定义两个默认持久化队列,分别处理email和sms

amqpAdmin.declareQueue(new Queue("fanout_queue_email"));

amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));

//3.将队列分别与交换器进行绑定

// 队列名 是队列 交换机的名称 路由 其它参数

amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));

amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));

}

    

   

2.消息发送者发送消息

             创建实体类

                

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

@Data

@AllArgsConstructor

@NoArgsConstructor

public class User {

private Integer id;

private String name;

}

                发送消息

 

@Autowired

private RabbitTemplate re;

@Test//消息发送者

public void subPublisher(){

User user = new User(1,"小满");

re.convertAndSend("fanout_exchange", "", user);

}

                        如图所以,如果我们直接发送的话就会报这个错,有两种解决方法,第一种是比较常用的让实体类User实现序列化Serializable接口,这里我们不做演示,第二种是写一个配置类,只有在RabbitMQ可以使用

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMQConfig {

//定制JSON格式的消息转化器

@Bean

public MessageConverter messageConverter(){

return new Jackson2JsonMessageConverter();

}

}

        加上配置类后我们发送就不会报错了,我们也可以在RabbitMQ的可视化端口看到我们发送的消息

 

 

        3.发送完消息后接下来就是消费消息了,定义接收消息的业务

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

@Service

public class RabbitMQService {

//发布订阅模式: @RabbitListener可以指定当前方法监控哪一个队列

@RabbitListener(queues = "fanout_queue_email")//消费者可以消费多个队列的消息

public void subConsumerEmail(Message message){

//当队列中有内容是方法会自动执行 推荐Object来接收

//官网推荐Message

byte[] body = message.getBody();//Message将数据存放在body中

String msg = new String(body);

System.out.println("邮件业务接收到消息:"+msg);

}

@RabbitListener(queues = "fanout_queue_sms")

public void subConsumerSms(Message message){

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("短信业务接收到消息:"+msg);

}

}

        4.重新运行发送端就可以接收到我们发送的数据,接收的数据可能打印在任意一个控制台中,这是idea的机制,我们不需要管 

基于配置类的方式

        1.在config配置类中定义

import org.springframework.amqp.core.*;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMQConfig {

//定制JSON格式的消息转化器

@Bean

public MessageConverter messageConverter(){

return new Jackson2JsonMessageConverter();

}

// 1.fanout创建一个交换机

@Bean

public Exchange fanoutExchange(){

return ExchangeBuilder.fanoutExchange("fanout_exchange").build();

}

//2.定义消息队列

@Bean

public Queue fanoutQueueEmail(){

return new Queue("fanout_queue_email");

}

@Bean

public Queue fanoutQueueSms(){

return new Queue("fanout_queue_sms");

}

//3.将创建的队列绑定到对应的交换机上

@Bean

public Binding bingingEmail(){

return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs();

}

@Bean

public Binding bingingSms(){

return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs();

}

}

        2.为了避免api的影响,我们可以在可视化端口将基于api创建的交换机和队列删除

                1)删除交换机

 

 

                2)删除队列,前面也是点击队列的名字 

        可以看到我已经将交换机和消息队列都已经删除,接下来我们重新启动项目 ,配置类可以在启动的时候自动创建

 

         我们的订阅发布模式也是可以正常运行

        

基于注解类的方式        

        1.我们要现将基于配置类的方式注释掉,避免影响我们测试

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

@Service

public class RabbitMQService {

@RabbitListener(bindings = @QueueBinding(

value = @Queue("fanout_queue_email"),

exchange=@Exchange(value = "fanout_exchange",type = "fanout")

))

public void subConsumerEmail(Message message){

//当队列中有内容是方法会自动执行 推荐Object来接收

//官网推荐Message

byte[] body = message.getBody();//Message将数据存放在body中

String msg = new String(body);

System.out.println("邮件业务接收到消息:"+msg);

}

@RabbitListener(bindings = @QueueBinding(

value = @Queue("fanout_queue_sms"),

exchange=@Exchange(value = "fanout_exchange",type = "fanout")

))

public void subConsumerSms(Message message){

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("短信业务接收到消息:"+msg);

}

}

        提前将交换机和队列删除,然后运行,就会发现会在启动时会自动生成交换机和队列,测试也不会有影响

 

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: