一、部署一个三节点集群

下面的链接是最快最简单的一种集群部署方法 3分钟部署一个RabbitMQ集群 上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。 每个容器应用都映射了宿主机的端口,分别是5602,5612,5622 docker compse文件如下

version: '3'

services:

stats:

image: bitnami/rabbitmq

environment:

- RABBITMQ_NODE_TYPE=stats

- RABBITMQ_NODE_NAME=rabbit@stats

- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3

ports:

- '15672:15672'

- '5602:5672'

volumes:

- 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'

queue-disc1:

image: bitnami/rabbitmq

environment:

- RABBITMQ_NODE_TYPE=queue-disc

- RABBITMQ_NODE_NAME=rabbit@queue-disc1

- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats

- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3

ports:

- '5612:5672'

volumes:

- 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'

queue-ram1:

image: bitnami/rabbitmq

environment:

- RABBITMQ_NODE_TYPE=queue-ram

- RABBITMQ_NODE_NAME=rabbit@queue-ram1

- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats

- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3

ports:

- '5622:5672'

volumes:

- 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'

volumes:

rabbitmqstats_data:

driver: local

rabbitmqdisc1_data:

driver: local

rabbitmqram1_data:

driver: local

通过docker-compose up命令,就可以启动三个集群的容器了

[root@localhost mycompose]# docker-compose up

二、配置文件

原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

server:

port: 8080

spring:

application:

name: rabbitmq-demo

#配置rabbitMq 服务器

rabbitmq:

#单节点直接可以写host和port

# host: 192.168.56.201

# port: 5672

#集群连接写ip和端口

addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622

username: user

password: bitnami

#虚拟host

virtual-host: virtual01

template:

mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息

publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功

publisher-returns: true #是否开启生产者returns

listener:

simple:

acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法

prefetch: 10 #每个消费者可拉取的,还未ack的消息数量

concurrency: 3 #消费端(每个Listener)的最小线程数

max-concurrency: 10 #消费端(每个Listener)的最大线程数

三、代码

生产者

和单节点的发送和消费代码一致,没有变化

@Slf4j

@RestController

@RequestMapping("/rabbit")

public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

private static final String EXCHANGE_NAME = "my_exchange";

private static final String ROUTING_KEY = "my_routing";

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 正常发送并被broker接收

* @return

*/

@RequestMapping("send")

public String send() {

for (int i = 0; i < 10; i++) {

OrderInfo orderInfo = new OrderInfo();

orderInfo.setAddress("成都市高新区");

orderInfo.setOrderId(String.valueOf(i));

orderInfo.setProductName("华为P60:" + i);

//设置回调关联的一个id

String messageId = UUID.randomUUID().toString();

log.info("开始发送消息,当前消息关联id为:{}", messageId);

CorrelationData correlationData = new CorrelationData(messageId);

MessageProperties messageProperties = new MessageProperties();

messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))

.andProperties(messageProperties).build();

//设置ack回调

rabbitTemplate.setConfirmCallback(this);

//退回消息的回调

rabbitTemplate.setReturnCallback(this);

rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);

}

return "ok";

}

/**

* 设置一个非法的路由键,模拟消息被broker退回的情况,前提是

* spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息

*

* spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功

*

* @return

*/

@RequestMapping("send-return")

public String sendAndReturn() {

OrderInfo orderInfo = new OrderInfo();

orderInfo.setAddress("成都市高新区");

orderInfo.setOrderId("111");

orderInfo.setProductName("小米13");

//设置回调关联的一个id

String messageId = UUID.randomUUID().toString();

log.info("开始发送消息,当前消息关联id为:{}", messageId);

CorrelationData correlationData = new CorrelationData(messageId);

MessageProperties messageProperties = new MessageProperties();

messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))

.andProperties(messageProperties).build();

//设置ack回调

rabbitTemplate.setConfirmCallback(this);

//退回消息的回调

rabbitTemplate.setReturnCallback(this);

//下面这个RoutingKey是没有绑定的,所以发不出去

rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);

return "ok";

}

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if (correlationData == null) {

return;

}

String messageId = correlationData.getId();

if (ack) {

log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);

} else {

log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);

}

}

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",

new String(message.getBody()), replyCode, replyText, exchange, routingKey);

}

}

消费者

@Slf4j

@Component

public class RabbitOrderConsumer {

private static final String EXCHANGE_NAME = "my_exchange";

private static final String QUEUE_NAME = "my_queue";

private static final String ROUTING_KEY = "my_routing";

@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),

exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})

public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

//上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建

log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);

channel.basicAck(tag, false);

}

}

访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773

开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561

开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0

开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270

开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c

开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de

开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4

接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1

接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1

开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f

开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab

开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969

【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773

【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561

接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2

接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1

接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3

接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2

接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4

接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2

接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3

接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3

【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0

【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270

【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c

【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de

【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4

【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f

【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab

【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969

上述代码仓库:https://gitee.com/syk1234/mqdmo

四、后台管理

登录管理后台页面:http://192.168.56.202:15672/

共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

查看连接情况,发现是连接的是节点rabbit@stats节点 查看队列的情况,队列是在rabbit@stats节点上

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: