参考书籍:朱忠华的《RabbitMQ实战指南》

一、基础概念

1.Exchange

1.1 创建方法的参数,exchangeDeclare()

exchange:交换器的名称type:交换器的类型durable:是否持久化,true代表持久化。(持久化会将交换器存入磁盘)autoDelete:是否自动删除,true表示自动删除。(当该交换器上绑定的最后一个队列/交换器解除绑定后,该交换器自动删除)internal:是否是内置的,true表示内置交换器。(生产者无法直接发消息给内置交换器,只能通过其他交换器路由到该交换器)argument:其他一些结构化的参数

1.2 交换器类型

1.2.1 fanout

把所有消息路由到与该交换器绑定的队列

1.2.2 direct

路由时需要BindingKey和RoutingKey完全匹配

1.2.3 topic

路由到BindingKey和RoutingKey相匹配的队列

RoutingKey和BindingKey为以“.”分隔的字符串BindingKey中可以存在“*”(匹配一个单词),“#”(匹配多个单词,也可以是0个)

1.2.4 header

不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配

1.3 备份交换器

生产者发送消息时未设置mandatory参数,若根据路由键没有符合的队列该消息将会丢失如果设置mandatory参数则需要增加程序对应的处理逻辑,相对复杂可以设置备份交换器来存储这部分消息,后面再根据需要进行处理这部分消息

2.Queue

2.1 创建方法的参数,queueDeclare()

queue:队列的名称durable:是否持久化exclusive:是否排他,true表示排他。(队列仅对首次声明他的连接可见,并在该连接断开后自动删除)

排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)可以同时访问该队列 autoDelete:是否自动删除,true表示自动删除。(至少有一个消费者连接到这个队列,之后所有与这个连接的消费者都断开时,才会自动删除)arguments:其他一些参数

x-message-ttl:指定队列中消息的过期时间(以毫秒为单位)。超过指定时间的消息将被自动删除。x-max-length:指定队列中消息的最大数量。不设置的情况下,将没有长度限制,无限制地消耗系统资源。x-max-length-bytes:指定队列中消息的最大总字节数。当队列中的消息总字节数达到最大值时,新的消息将被拒绝或丢弃。x-expires:指定队列的过期时间(以毫秒为单位)。当队列在指定时间内未被使用时,将自动删除。x-dead-letter-exchange:指定一个交换机名称,用于接收被拒绝或过期的消息。这些消息将被重新路由到指定的交换机中。x-dead-letter-routing-key:指定一个路由键,用于重新路由被拒绝或过期的消息。x-overflow:队列消息数量超过限制后的策略模式。默认采用drop-head策略

drop-head:新的消息将覆盖队列中最早的消息reject-publish:新的消息将被拒绝并丢弃reject-publish-dlx:新的消息将被拒绝并发送到死信交换机 x-max-priority:队列消费的优先级顺序…

3.死信队列

DLX,全称Dead-Letter-Exchange,死信交换器。当消息在一个队列中变成死信后,会被重新发送到另一个交换器,这个交换器就是DLX,其绑定的队列称之为死信队列

3.1消息变成死信的情况

消息被拒绝且requeue设置为false消息过期队列达到最大长度,且x-overflow策略为reject-publish-dlx

3.2 流程图

4.延迟队列

延迟队列存储的对象是延迟消息,就是在某些特殊情况下,需要让消息在一定时间后再交给消费者进行消费的场景。

实现逻辑

和上面死信队列中的流程图一致,只是上图中消息进入死信队列的情况有三种,延迟队列只是通过消息过期来这一种情况来实现延迟消息中,queue1不绑定消费者,消费者绑定queue2,等待消息过期后进行消费

5. 优先级队列

这类队列中具有高优先级的消息优先被消费

5.1 注意事项

首先queue需要设置x-max-priority属性,且messge也需要设置priority属性如果只设置message的priority属性,不设置queue的x-max-priority属性。则消息的 priority 值只是一个消息属性,但不会影响消息的处理顺序消息的优先级priority为0-9,值为0代表会被最后消费x-max-priority值代表该队列能识别priority的最大值,priority大于该值的消息都视同为x-max-priority的值

5.2 例子

如果队列的x-max-priority为5,分别有priority为0,1,2,3,4,5,6,7,8,9的消息若干条,将会以怎么样的顺序进行消费?

队列的x-max-priority为5,所以所有优先级大于5的消息(即优先级为6,7,8,9的消息)都将被视为优先级为5。 优先级为5(包括原本优先级为6,7,8,9的消息)的消息将首先被消费然后消费优先级为4的消息然后消费优先级为3的消息…然后消费优先级为0的消息

6.持久化

6.1 分类

6.1.1 交换器的持久化

通过声明交换器的时候设置durable参数如果不做持久化,重启服务后,交换器的元数据会丢失,但是消息不会丢失

6.1.2 队列的持久化

通过声明队列的时候设置durable参数如果不做持久化,重启服务后,队列的元数据会丢失,消息数据也会丢失

6.1.3 消息的持久化

通过生产消息时候设置basicProperties的deliveryMode参数

6.2 注意事项

设置队列和消息的持久化,服务重启后,消息和队列依旧存在只设置队列持久化,服务重启后,队列存在,但消息丢失只设置消息持久化,服务重启后,队列消失,继而消息也会丢失

因为消息是存储在队列中的,如果队列丢失了,那么队列中的消息,无论是否持久化,都会丢失。

二、消息

1.生产消息

1.1 basicPublish()

exchange:交换器名称routingKey:路由键props:消息的基本属性集

contentType:设置消息的内容类型

application/json --> json格式的数据application/xml --> XML格式text/plain --> 纯文本格式… contentEncoding:设置消息的内容编码header:对应交换器为header类型时使用deliveryMode:设置消息的投递模式,可选值为 1(非持久化)或 2(持久化)priority:设置消息的优先级,优先级由低到高为0 到 9,值为0代表最后消费。注意只有对应的队列设置了x-max-priority 参数才会根据优先级进行消费replyTo:设置消息的回复队列。指定一个队列,用于接收消息的回复或响应,消费者可以将回复发送到指定的队列中,使生产者能够获取到回复消息。expiration:设置消息的过期时间messageId:设置消息的唯一标识符timestamp:设置消息的时间戳type:描述消息的类型。消费者可以根据type属性的不同,分类处理不同type的消息 byte[] body:消息体mandatory:true表示交换器根据路由键找不到符合的队列时,会调用basicReturn将消息返回给生产者。false表示这种情况会把消息丢弃immediate:true表示交换机将消息路由到队列上且该队列上没有消费者时,那么该消息不会存入队列,当匹配的所有队列都没有消费者时,会通过basicReturn将消息返回给生产者

2.消费消息

2.1 消费模式

2.1.1 推模式:basicConsumer()

queue:队列的名称autoAck:设置是否自动确认consumerTag:消费者标签。用来区分多个消费者noLocal:为true表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者exclusive:是否排他,消费者独占该队列,其他消费者无法同时访问该队列arguments:其他参数callback:设置回调函数

2.1.2 拉模式:basicGet()

2.2 消费端的确认与拒绝

2.2.1 确认

若消费者在订阅时设置autoAck为true,则发送出去的消息会自动被置为确认,然后删除该消息。若消费者在订阅时设置autoAck为false,则发出的消息后,会等待消费者显式的回复确认信号后再删除该消息。若在显式的回复确认信号前进程挂掉或者其他原因一直未发出,RabbitMQ会一直等待持有消息,等待消费者显式调用Basic.Ack为止若autoAck为false,一直没收到确认信号,并且消费者已经断开连接,则会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者

2.2.2 拒绝

拒绝单条消息:basicReject()

deliveryTag:消息的编号requeue:为true表示将该消息重新存入队列,以便发送给下一个消费者。为false表示把消息从队列中移除 批量拒绝消息:basicNack()

deliveryTagmultiple:false表示拒绝编号为delivery的这一条消息,为true表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息requeue 重新请求未确认的消息:basicRecover()

requeue:消息未被确认的消息重新加入到队列后,为true表示同一条消息会被分配给与之前不同的消费者,为false表示同一条消息会被分配给与之前相同的消费者

3.消息过期时间TTL

3.1 实现方式

设置队列属性。x-message-ttl指定队列中消息的过期时间(以毫秒为单位)。超过指定时间的消息将被自动删除。生产消息时候针对该消息设置过期时间。expiration设置消息的过期时间

3.2 注意事项

如果两种方法一起使用,消息的TTL以两者较小的数值为准消息在队列中的生存时间超过TTL后,就会变成“死信”,不会再被消费者接收在队列属性TTL方式中,消息过期后就会从队列中删除在消息属性TTL方式中,消息过期后不回立即删除,因为消息只有在被投递到消费者的时候才会进行判定

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: