RabbitMQ主题模式(通配符模式)

前言什么是Topic模式使用Topic模式的要点通配符规则示例

代码示例Pom文件引入RabbtiMQ依赖RabbitMQ工具类生产者消费者1消费者2效果

总结

前言

通过本篇博客能够简单使用RabbitMQ的主题模式。 本篇博客主要是博主通过官网总结出的RabbitMQ主题模式。其中如果有误欢迎大家及时指正。

什么是Topic模式

Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这使我们相较于Direct模式灵活性更大。

使用Topic模式的要点

routing key必须是由"."进行分隔的单词列表,最大限制为255字节

通配符规则

"*"可以代替一个单词。"#"可以代替零个或多个单词。

示例

创建了三个绑定:Q1绑定了绑定键“.orange”。和Q2的".*.rabbit"和“lazy.#”。

1.一个消息的路由键为"quick.orange.rabbit" 时,它将会被送到队列Q1和Q2。 2.一个消息的路由键为"quick.orange.fox"时,它将会背诵到队列Q1 3.一个消息的路由键为"lazy.brown.fox"时,它将被送到队列Q2 4.一个消息的路由键为"quick.brown.fox",没有匹配任何队列,消息将会丢失。 5.一个消息的路由键为"lazy.orange.new.rabbit",它将被送到队列Q2. 6.一个消息的路由键为"orang"或者"quick.orange.new.rabbit"没有匹配到任何队列消息将丢失。

代码示例

Pom文件引入RabbtiMQ依赖

com.rabbitmq

amqp-client

5.10.0

RabbitMQ工具类

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author : [WangWei]

* @version : [v1.0]

* @className : RabbitMQUtils

* @description : [rabbitmq工具类]

* @createTime : [2023/1/17 8:49]

* @updateUser : [WangWei]

* @updateTime : [2023/1/17 8:49]

* @updateRemark : [描述说明本次修改内容]

*/

public class RabbitMQUtils {

/*

* @version V1.0

* Title: getConnection

* @author Wangwei

* @description 创建rabbitmq连接

* @createTime 2023/1/17 8:52

* @param []

* @return com.rabbitmq.client.Connection

*/

public static Connection getConnection() throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("ip");

factory.setPort(5672);

factory.setVirtualHost("虚拟主机");

factory.setUsername("用户名");

factory.setPassword("密码");

//创建连接

Connection connection=factory.newConnection();

return connection;

}

/*

* @version V1.0

* Title: getChannel

* @author Wangwei

* @description 创建信道

* @createTime 2023/1/17 8:55

* @param []

* @return com.rabbitmq.client.Channel

*/

public static Channel getChannel() throws IOException, TimeoutException {

Connection connection=getConnection();

Channel channel=connection.createChannel();

return channel;

}

}

生产者

import com.rabbitmq.client.Channel;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

import java.util.concurrent.TimeoutException;

/**

* @author : [WangWei]

* @version : [v1.0]

* @className : Producer

* @description : [生产者]

* @createTime : [2023/2/1 9:38]

* @updateUser : [WangWei]

* @updateTime : [2023/2/1 9:38]

* @updateRemark : [描述说明本次修改内容]

*/

public class Producer {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {

//建立连接

RabbitMQUtils.getConnection();

//声明通道

Channel channel = RabbitMQUtils.getChannel();

//创建topic类型交换机并命名为logs

channel.exchangeDeclare(EXCHANGE_NAME,"topic");

//声明routingKey

String severityInfo="info.log.test";

String severityError="error.test";

String severityError2="log.error.test";

//循环发送2条消息

for (int i = 0; i <2 ; i++) {

String msg="info.log.test:"+i;

/*推送消息

*交换机命名,不填写使用默认的交换机

* routingKey -路由键-

* props:消息的其他属性-路由头等正文

* msg消息正文

*/

channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));

System.out.println(msg);

}

//循环发送2条消息

for (int i = 0; i <2 ; i++) {

String msg="主题模式error.test:"+i;

/*推送消息

*交换机命名,不填写使用默认的交换机

* routingKey -路由键-

* props:消息的其他属性-路由头等正文

* msg消息正文

*/

channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));

System.out.println(msg);

}

//循环发送2条消息

for (int i = 0; i <2 ; i++) {

String msg="log.error.test:"+i;

/*推送消息

*交换机命名,不填写使用默认的交换机

* routingKey -路由键-

* props:消息的其他属性-路由头等正文

* msg消息正文

*/

channel.basicPublish(EXCHANGE_NAME,severityError2,null,msg.getBytes(StandardCharsets.UTF_8));

System.out.println(msg);

}

}

}

消费者1

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author : [WangWei]

* @version : [v1.0]

* @className : ConsumerOne

* @description : [消费者1]

* @createTime : [2023/2/1 9:39]

* @updateUser : [WangWei]

* @updateTime : [2023/2/1 9:39]

* @updateRemark : [描述说明本次修改内容]

*/

public class ConsumerOne {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {

RabbitMQUtils.getConnection();

Channel channel = RabbitMQUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"topic");

String queueName = channel.queueDeclare().getQueue();

//声明routingKey (error)

String severityError="error.*";

//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失

//queueName绑定了direct_logs交换机并且绑定了routingKey

channel.queueBind(queueName, EXCHANGE_NAME,severityError );

//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

}

}

消费者2

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @author : [WangWei]

* @version : [v1.0]

* @className : ConsumerTwo

* @description : [消费者2]

* @createTime : [2023/2/1 9:38]

* @updateUser : [WangWei]

* @updateTime : [2023/2/1 9:38]

* @updateRemark : [描述说明本次修改内容]

*/

public class ConsumerTwo {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {

RabbitMQUtils.getConnection();

Channel channel = RabbitMQUtils.getChannel();

//创建fanout类型交换机并命名为logs

channel.exchangeDeclare(EXCHANGE_NAME,"topic");

//创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称

String queueName = channel.queueDeclare().getQueue();

//声明routingKey (info,error,warning)

String severityInfo="info.#";

String severityError="*.error.*";

//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失

//queueName绑定了direct_logs交换机并且绑定了3个routingKey

channel.queueBind(queueName, EXCHANGE_NAME,severityInfo );

channel.queueBind(queueName, EXCHANGE_NAME,severityError );

//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

}

}

效果

总结

通过使用通配符实现灵活性的应用有很多,例如nginx的请求转发,gateway为请求过滤等等都是使用了统配符的技术。通过这种联想来对知识进行结构化,找相同和不同,思考能力和学习力也会有很大的提高。

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: