RabbitMQ的简单介绍:    

     首先,我们先了解一下RabbitMQ的含义。RabbitMQ是一个开源的消息中间件,它实现了高级消息队列协议(AMQP)用于进行异步通信。

这里简单的介绍一下异步通信:异步通信是一种通信模式,其中发送方和接收方的操作不是同步进行的。在异步通信中,发送方向接收方发送消息,然后继续执行其他操作,而不必等待接收方的响应。接收方在接收到消息后,可以处理消息,并在处理完成后向发送方发送响应。(举个例子:就好比你在微信上向一个好友发了一条消息,你的好友不一定要立即回复你,而是他有时间了再回复你,你也不是一直守着等他回复你。)

RabbitMQ基于消息队列的模式,通过将消息发送到队列中,然后由消费者从队列中取出并处理这些消息。主要的概念如下:

Producer(生产者):负责发送消息到RabbitMQ的队列中。 Consumer(消费者):从RabbitMQ的队列中接收并处理消息。 Queue(队列):用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中接收消息。 Exchange(交换机):接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列中。 Binding(绑定):用于将交换机和队列进行绑定,指定消息的路由规则。 Routing Key(路由键):用于将消息从交换机路由到特定的队列。

你要先对着这些概念有个印象,接下来让我们用编码的方式来进行实现。

环境及项目的搭建:

本次我们使用JDK17、spring boot3.0、RabbitMQ3.9.11

本次采用父子工程的形式,一个父项目下面有两个子模块。父项目是一个空的maven项目,子模块为:生产者模块(Producer)和消费者模块(Consumer)两个子模块都是spring boot项目。在两个子模块中引入RabbitMQ的依赖AMQP,

AMQP是一种通信协议(类似于HTTP请求协议)RabbitMQ遵从AMQP,而spring boot整合过AMQP,所以只要引入AMQP的依赖就可以实现RabbitMQ,两个子模块的pom.xml文件如下:

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-amqp

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

接下来,我们使用docker来创建RabbitMQ容器(默认已经安装过docker了):

docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management  

运行这条命令就可以创建一个docker容器,解释一下我们会用到的两个端口,

5672:是RabbitMQ默认的AMQP协议端口,用于客户端应用程序与RabbitMQ服务之间的通信。(服务端)

15672:是RabbitMQ的Web管理界面的默认端口,用于通过浏览器访问RabbitMQ的管理界面(客户端)

使用http://虚拟机地址:15672  就可以访问到。第一次登录,用户名和密码,你可以任意填,但是填入之后就会保存,下一次登录,就需要用到这次填的用户名和密码。

这就是登录成功的页面,接下来,我们根据这个用户来进行一系列的操作。

代码的实现:

在我们两个子项目的配置文件中加入关于RabbitMQ的配置信息:

spring:

rabbitmq:

  port: 5672 

host: 192.168.231.110

username: zhangqiao

password: 123456

    virtual-host: /

host:你的虚拟机地址

post:RabbitMQ的服务端口号,一般为5672

username:用户名

password:密码

 virtual-host:虚拟主机号,类似于分组,用来隔离不同的使用场景,每个虚拟主机都是一个独立的消息代理

接下来我先解释一下前面提到的6个概念,

生产者:见名知意,就是生产消息的一方。对应的模块为Producer,作用是发送消息

消费者:接收消息的一方,对应的模块为Consumer,接收Producer发送的消息。

注意:生产者和消费者的身份是相对的,A要对B发消息,A是生产者,B是消费者。不能单指某模块为生产者或消费者,且这两者的身份因为业务的不同,随时可以发生调换。

Queue(队列):用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中接收消息。但是我们一般不这样做,这样太直接了,而且也不能在进行其他的操作。这个时候我们会引入一个中间量:Exchange(交换机)。

Exchange(交换机):接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列中。

Binding(绑定):用于将交换机和队列进行绑定,指定消息的路由规则。

Routing Key(路由键):用于将消息从交换机路由到特定的队列。(并不是交换机与队列绑定了,就一定会将消息发送给队列。绑定只是一个前提,还要满足路由键(某些指定的条件)才行)

现在,我想你已经了解了它们之间的关系,生产者将消息发送给交换机,交换机与指定的对列进行绑定,并通过路由键判定交换机是否发送消息给队列。消费者监听队列,就能获得生产者发送的消息。

生产者与交换机、交换机与队列、队列与消费者;它们之间都是多对多的关系。

一个生产者可以绑定多个交换机,一个交换机也可以被多个生产者绑定。交换机与队列、队列与消费者之间亦是如此。

常用的有三种类型的交换机:

Direct、Topic、Fanout交换机,下面我们会进行详细介绍;

接下来在idea中编写代码:

首先,我们编写生产者(Producer)模块代码:按照我们之前的逻辑,生产者是要往交换机发送消息的,那么我们的首先拥有交换机才行。在RabbitMQ中创建交换机一般有两种常用方式,接下来我们挨个实现。

1、使用bean的方式新建交换机和队列,并指定绑定关系

@Configuration public class RabbitMQConfig {

    @Bean     public DirectExchange directExchange() {         return new DirectExchange("zhang.direct01");     }

    @Bean     public Queue defaultQueue() {         return new Queue("zhang.queue01");     }

    @Bean     public Binding defaultBinding(Queue defaultQueue, DirectExchange directExchange) {         return BindingBuilder.bind(defaultQueue).to(directExchange).with("red");     } }  

创建一个Direct交换机zhang.direct01,创建一个消息队列zhang.queue01,将他们进行绑定,指定他们之间的路由键为red。

2、使用注解建立交换机和队列,并指定绑定关系

@RabbitListener(bindings = @QueueBinding(

value = @Queue("zhang.queue01"),

exchange = @Exchange(value = "zhang.direct01",type = "direct"),

key = "red"

))

这个注解必须要依托在方法上,不能独自使用。

@Queue("zhang.queue01"):新建一个zhang.queue01队列。(如果已存在,不做任何处理)

@Exchange(value = "zhang.direct01",type = "direct"):新建一个zhang.direct01交换机,类型为direct(如果已存在,不做任何处理)

key:路由键red。 

使用注解创建的交换机和队列是绑定的。(推荐使用第二种注解方式,简单、直白)

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

void DirectSendMessage() {

rabbitTemplate.convertAndSend("zhang.direct01","","张三,你好啊");

 rabbitTemplate.convertAndSend("zhang.direct01","red","张三,你好啊。路由键为red");

}

引入rabbitTemplate,使用convertAndSend方法就能发送消息到交换机,zhang.direct01是交换机的名称,第二个参数是路由键,(也可以不设置,不设置时表示每个与交换机绑定的队列都能接收到消息)、第三个参数就是要发送的消息。

现在,我们已经创建了交换机zhang.direct01,创建了消息队列zhang.queue01,并将它们进行了绑定,指定他们的路由键为red。并且向zhang.direct01发送了两条消息,一条没有指定路由键,一条指定路由键为red。(没有指定路由键时,那么所有与之绑定的队列都能接受到消息)

现在,我们就可以在消费者模块(Consumer)编写代码,来监听队列了。

// 监听普通的消息

@RabbitListener(queues = {"zhang.queue01"})

public void listenerQueue(String msg){

System.out.println("消费者监听了队列zhang.queue01,接收到消息:"+msg);

}

我们在一个bean中编写监听代码,使用@RabbitListener注解,并指定监听的消息队列为zhang.queue01,接收一个String类型的参数msg,那么队列中缓存的消息就会映射到msg中,在控制台进行输出,(这个参数类型要与生产者发送到交换机的消息类型一致,不然会报错)将这个模块运行起来就能一直监听zhang.queue01这个消息队列了。

如图所示,我们监听到了,zhang.queue01,这个对类中的所有消息。

现在,我们已经能发送和接收消息了,接下来,我们重点关注一下交换机和 路由键。

1、Direct Exchange(直连交换机): Direct Exchange 是最简单的交换机类型。它根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。如果消息的路由键与某个队列绑定的路由键完全相同,那么消息将被发送到该队列。

2、Fanout Exchange(扇形交换机): Fanout Exchange 将消息广播到与之绑定的所有队列。它忽略消息的路由键,只需将消息发送到与之绑定的所有队列即可。

3、Topic Exchange(主题交换机): Topic Exchange 根据消息的路由键模式进行匹配和分发。它使用带有通配符的路由键进行匹配,支持两种通配符符号:*(匹配一个单词)和#(匹配零个或多个单词)。例如,路由键 "stock.usd.nyse" 可以匹配绑定了 "stock..*" 或者 "stock.#" 的队列。

这三种路由键是我们最常用到的,我们之前建立的就是Direct类型的交换机,接下来我们建立第二种交换机Fanout,并绑定队列

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "zhang.queue02",durable = "true"),

exchange = @Exchange(value = "zhang.fanout",type = ExchangeTypes.FANOUT)

))

@Test

void FanoutSendMessage() {

rabbitTemplate.convertAndSend("zhang.fanout","","张三,你好啊,广播模式");

}

Fanout忽略路由键,所以我们创建时,可以不写路由键。

在消费者模块监听zhang.queue02队列

//监听广播消息

@RabbitListener(queues = {"zhang.queue02"})

public void listenerBroadcast(String msg){

System.out.println("消费者监听了队列zhang.queue02,接收到消息:"+msg);

}

重新启动模块,就能监听到zhang.queue02的消息:

建立Topic 交换机,它与Direct交换机不同的是,Topic的路由键可以使用通配符,

*(星号):匹配一个单词。例如,绑定键为 "stock.*" 的队列将匹配到路由键为 "stock.apple"、"stock.microsoft"等。 #(井号):匹配零个或多个单词。例如,绑定键为 "stock.#" 的队列将匹配到路由键为 "stock"、"stock.apple"、"stock.apple.us"等。

接下来,我们来演示一下:

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "zhang.queue03"),

exchange = @Exchange(value = "zhang.topic01",type = ExchangeTypes.TOPIC),

key = {"china.*","#.queue","mm.nn"}

))

@Test

void TopicSendMessage() {

rabbitTemplate.convertAndSend("zhang.topic01","mm.nn","张三,你好啊,mm.nn");

rabbitTemplate.convertAndSend("zhang.topic01","mm.queue","张三,你好啊,mm.queue");

rabbitTemplate.convertAndSend("zhang.topic01","china.nn","张三,你好啊,china.nn");

rabbitTemplate.convertAndSend("zhang.topic01","mm.china","张三,你好啊,mm.china");

rabbitTemplate.convertAndSend("zhang.topic01","queue.nn","张三,你好啊,queue.nn");

}

我们创建了zhang.queue03队列,绑定了zhang.topic01,并制定了三个路由键。使用了通配符的形式。然后发送了五条消息,第四、五条消息不符合规则。因此不能接受到。

到这里,我们整合RabbitMQ实现快速入门就完成了。然后补充一点知识:

      如果发送的消息为对象类型时,由于Rabbit默认使用了JDK的序列化方式(底层会判断你传入的是不是message类型,如果不是会使用消息类型转化器转为消息类型),就算你收发双方都使用了同一种类型。那么,消费者在接收时也会报错,这是由于接收双发使用的消息类型转化器不一致。

生产者发送的消息类型为Student

消费者接收的消息类型也为Student

运行时报错,原因是:发送方与接收方的消息类型转换器不统一:

我们有两种解决方案,

<一>:自定义消息类型转换器,使接收方与发送方的消息类型转换器一致

导入json的依赖,MessageConverter 的类型为:

@Configuration

public class MessageConvent {

@Bean

public MessageConverter jsonMessageConvert() {

return new Jackson2JsonMessageConverter();

}

}

在收发双方同时加入自定义消息类型转换器就可以了。

<二>:在发送消息时,将对象转化为字符串,接收方依旧使用String接收。

第一种方式更加正规一些,但是第二种方式简单一些。根据自己的需求选择适用于自己的方式。

一般来说:在生产者中,进行交换机的创建、消息队列的创建、绑定关系,与发送消息(推荐使用@RabbitListener注解)

在消费者中,只进行监听队列的绑定。。。。。

我在使用@RabbitListener注解时遇到了一些问题,在这里我记录一下,来警示自己。

我在使用@RabbitListener创建交换机与队列时,有时候会报错误,显示找不到队列。我感到很惊奇,明明@RabbitListener注解就是用来创建交换机和队列的,怎么会显示找不到队列呢?我之后请教了学校的老师后才发现,原来使用@RabbitListener创建交换机与队列时,有一些前置的条件要满足。

@RabbitListener 注解会自动声明队列、交换机和绑定。但是,它只会在第一次启动应用程序时执行,以确保队列、交换机和绑定的声明只发生一次。如果在后续的运行中,队列已经存在或者配置发生了变化,@RabbitListener 注解将不会再次声明队列。

因此,如果你在代码中使用 @RabbitListener注解监听的队列名称,但是该队列在 RabbitMQ 中并不存在,那么在第一次运行时会自动创建该队列。但如果在后续的运行中,你手动删除了该队列,或者更改了队列的配置,再次启动应用程序时@RabbitListener注解不会再次创建该队列。

解决该问题的方法是,在应用程序启动之前,在控制台(15672)手动创建队列或者在配置文件中配置队列的声明。这样可以确保队列的存在,并且与 @RabbitListener 注解中指定的队列名称一致。

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: