RabbitMQ工作模式
1.简单模式
1.1 模式介绍
一个生产者、一个消费者、不需要设置交换机(使用默认交换机)
1.2 代码测试
生产者
public class ProducerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置初始化参数
// 设置ip 默认 localhost
factory.setHost("192.168.52.128");
// 设置端口 默认5672
factory.setPort(5672);
// 设置虚拟机 默认 /
factory.setVirtualHost("/test");
// 设置用户名密码 默认 guest
factory.setUsername("admin");
factory.setPassword("admin");
// 3.获取连接 Connection
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* public AMQP.Queue.DeclareOk queueDeclare
* (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
* queue 队列名称
* durable 是否持久化
* exclusive 是否独占队列、当Connection关闭时是否删除队列
* autoDelete 是否自动删除、当没有Consumer时,自动删除
*
*/
channel.queueDeclare("hello_world", true, false, false, null);
// 6.发送消息
/**
* void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
* throws IOException;
* exchange 交换机名称。简单模式下交换机会使用默认的""
* routingKey 路由名称
* props 配置信息
* body 发送消息
*/
String body = "Hello RabbitMQ";
channel.basicPublish("","hello_world",null,body.getBytes());
// 7.释放资源
channel.close();
connection.close();
}
}
消费者
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello_world", true, false, false, null);
// 接受消息
/**
* String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
* queue 队列名称
* autoAck 是否自动确认
* callback 回调
*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("hello_world", true, consumer);
// 不要关闭资源
//channel.close();
//connection.close();
}
}
2.Work queues 工作队列模式
2.1模式介绍
模式介绍:与入门程序的简单模式相比,多了一个或一些消费端,多了消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务的处理速度。
小结:
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。 Work Queues 对于任务过重或任务较多情况使用队列模式可以提高任务处理的速度。例如:短信服务部署多个,只需要一个节点成功发送即可。
2.2 代码测试
一个生产者和两个消费者
都是单独文件main方法执行
创建生产者,循环发送10条不同的消息。
public class ProducerWorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello_world", true, false, false, null);
for (int i = 0; i < 10; i++) {
String body = "Hello RabbitMQ_" + i;
channel.basicPublish("", "work_queues", null, body.getBytes());
}
channel.close();
connection.close();
}
}
创建两个消息者,同时接收消息。
第一个消费者:ConsumerWorkQueues1
public class ConsumerWorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work_queues", true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
// 消费
channel.basicConsume("work_queues", true, consumer);
}
}
第二个消费者:ConsumerWorkQueues2
public class ConsumerWorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work_queues", true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("work_queues", true, consumer);
}
}
运行结果
3. Pub/Sub 订阅模式
3.1 模式介绍
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机 C:消费者,消息的接收者,会一直等待消息到来 Queue:消息队列,接收消息、缓存消息。 Exchange:交换机,一方面,接受生产者发送的消息。另一方面,知道然后处理消息,例如递交给某个特别队列、递交给所有队列或是将消息丢弃。到底如何取决于Exchange的类型,Exchange有以下三种常见类型:
Fanout:广播,将消息交给所有绑定到交换机的队列Direct:定向,把消息交给符合指定routing key的队列Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
3.2 代码测试
使用一个生产者和两个消费者来模拟日志
生产者:创建一个日志,日志内容:RabbitMQ:这是一条日志消息
消费者1:将日志打印到控制台
消费者2:将日志存储到数据库(模拟)
创建生产者
**1.创建连接工厂 **
ConnectionFactory factory = new ConnectionFactory();
2.设置默认参数
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setUsername("admin");
setHost:设置地址,默认localhost
setPort:设置端口,默认5672
setUsername&setPassword:设置用户名和密码
3.获取连接
Connection connection = factory.newConnection();
4. 获取通道(Channel)
Channel channel = connection.createChannel();
5.创建交换机
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
Exchange.DeclareOk exchangeDeclare (String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map
exchange:交换机名称type: 交换机类型,
DIRECT(“direct”), 定向FANOUT(“fanout”), 广播TOPIC(“topic”), 通配符HEADERS(“headers”); 参数 durable:是否持久化化autoDelete:是否自动删除internal:是否内部使用,默认为falsearguments: 参数,默认null
6.创建队列
String queueName1 = "test_fanout_queue1";
String queueName2 = "test_fanout_queue2";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
7.绑定队列和交换机
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
参数
queue: 队列名称exchange: 交换机名称routingKy: 路由key(绑定规则),交换机类型fanout,默认""
8.发送消息
String body = "RabbitMQ:这是一条日志消息";
channel.basicPublish(exchangeName, "", null, body.getBytes());
9.关闭资源
channel.close();
connection.close();
完整代码
public class ProducerPubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置默认参数
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
// 3.获取连接
Connection connection = factory.newConnection();
// 4.获取通道(Channel)
Channel channel = connection.createChannel();
// 5.创建交换机
/**
* Exchange.DeclareOk exchangeDeclare
* (String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map
* throws IOException;
* 参数
* exchange:交换机名称
* type: 交换机类型,
* DIRECT("direct"), 定向
* FANOUT("fanout"), 广播
* TOPIC("topic"), 通配符
* HEADERS("headers"); 参数
* durable:是否持久化化
* autoDelete:是否自动删除
* internal:是否内部使用,默认为false
* arguments: 参数,默认null
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
// 6.创建队列
String queueName1 = "test_fanout_queue1";
String queueName2 = "test_fanout_queue2";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
// 7.绑定队列和交换机
/**
* Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
* queue: 队列名称
* exchange: 交换机名称
* routingKey: 路由key(绑定规则),交换机类型fanout,默认""
*/
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
// 8.发送消息
String body = "RabbitMQ:这是一条日志消息";
channel.basicPublish(exchangeName, "", null, body.getBytes());
// 9.关闭资源
channel.close();
connection.close();
}
}
创建消费者:ConsumerPubSub1,使用队列test_fanout_queue1
public class ConsumerPubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("控制台打印:" + new String(body));
}
};
// 消费
channel.basicConsume("test_fanout_queue1", true, consumer);
}
}
创建消费者:ConsumerPubSub2,使用队列test_fanout_queue2
public class ConsumerPubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("数据存储:" + new String(body));
}
};
// 消费
channel.basicConsume("test_fanout_queue2", true, consumer);
}
}
测试
4.Routing 路由模式
4.1 模式介绍
队列与交换机绑定,不是任意的绑定了,而是指定一个RoutingKey(路由key)消息的发送方向在Exchange 发送消息时,也必须指定消息的的RoutingKeyExchange不在将消息交给每一个绑定的队列,而是根据消息的RoutingKey,进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息
4.2 代码测试
和Pub/Sub 订阅者模式一样,主要区别是交换策略和RoutingKey
生产者
public class ProducerRouting {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置默认参数
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
// 3.获取连接
Connection connection = factory.newConnection();
// 4.获取通道(Channel)
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 5.创建交换机,模式:DIRECT
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 6.创建队列
String queueName1 = "test_direct_queue1";
String queueName2 = "test_direct_queue2";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
// 7.绑定队列和交换机
// 这里,queueName1表示存储的数据库(模拟),queueName2表示控制台打印
channel.queueBind(queueName1, exchangeName, "error");
channel.queueBind(queueName2, exchangeName, "info");
channel.queueBind(queueName2, exchangeName, "error");
channel.queueBind(queueName2, exchangeName, "warning");
// 8.发送消息
String body = "RabbitMQ:这是一条日志消息,级别:warning";
channel.basicPublish(exchangeName, "warning", null, body.getBytes());
// 9.关闭资源
channel.close();
connection.close();
}
}
消费者1(数据存储)
public class ConsumerRouting1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("数据存储:" + new String(body));
}
};
// 消费
channel.basicConsume("test_direct_queue1", true, consumer);
}
}
消费者2(控制台打印)
public class ConsumerRouting2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("控制台打印:" + new String(body));
}
};
// 消费
channel.basicConsume("test_direct_queue2", true, consumer);
}
}
测试
5. Topics统配符模式
5.1 模式介绍
与Roting模式相似,不同之处是根据RoutingKey的不同,有不同的策略。
例如:
#.error:表示后缀为.error的RoutingKey消息
order.#:表示前缀为order.的RoutingKey消息
*.error:表示后缀为.error的RoutingKey消息,但*表示一个单词。sys.user.error则无法匹配;同样,前缀一样。
Topic主题模式可以实现Pub/Sub发布域订阅模式和Routing路由模式的功能,只是Topic在配置RoutingKey时候可以使用统配符,显得更加灵活。
5.2 代码测试
策略描述:一个生产者生产日志消息,同时两个消费者获取消息。
消费者1:存储到数据库(模拟)
消费者2:打印到控制台
限制:测试设置说明,所有error级别的日志存入数据库,所有order系统级别的日志存入数据库
生产者
public class ProducerTopics {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置默认参数
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
// 3.获取连接
Connection connection = factory.newConnection();
// 4.获取通道(Channel)
Channel channel = connection.createChannel();
String exchangeName = "test_topics";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
// 6.创建队列
String queueName1 = "test_topics_queue1";
String queueName2 = "test_topics_queue2";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
// 7.绑定队列和交换机
// 设置说明,所有error级别的日志存入数据库,所有order系统级别的日志存入数据库
channel.queueBind(queueName1, exchangeName, "#.error");
channel.queueBind(queueName1, exchangeName, "order.*");
channel.queueBind(queueName2, exchangeName, "*.*");
// 8.发送消息
String body = "RabbitMQ:这是一条日志消息,级别:order.error";
channel.basicPublish(exchangeName, "order.error", null, body.getBytes());
// 9.关闭资源
channel.close();
connection.close();
}
}
消费者1
public class ConsumerTopics1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("数据存储:" + new String(body));
}
};
// 消费
channel.basicConsume("test_topics_queue1", true, consumer);
}
}
消费者2
public class ConsumerTopics2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.52.128");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("控制台打印:" + new String(body));
}
};
// 消费
channel.basicConsume("test_topics_queue2", true, consumer);
}
}
测试结果
参考阅读
发表评论