微服务 · 消息队列 · RabbitMQ

目录

-- 基本概念

-- 安装方式

-- 基本使用

-- 五种消息队列

-- SpringBoot整合RabbitMQ

-- 总结

消息队列-基本概念

1. 消息队列是什么

可以说消息队列是一个放消息的容器 ,它是服务与服务之间数据通信的机制,以先进先出的方式实现通信,其中还采用了生产者消费者模式。

1. 消息队列的作用

解耦 这里的解耦是指在服务与服务之间的业务中,服务B可以不直接调用生产者而是通过 消息队列 ,各自不必考虑如何处理,使得代码的维护性得到了提高。 异步

同步:多个业务排队执行 异步:多个业务同时执行 将数据放到消息队列后即撤出,剩下的交给消息队列。 消峰 消峰是指在流量太大时可以通过控制消息队列的长度来限制同时处理的消息数量,从而达到限流保护服务器的作用。

消息队列的缺点:

1. 提高系统的复杂性

1. 降低系统的可用性

3. 主流的消息队列

主流的MQ:

Active MQRabbit MQ(延迟最低)Rocket MQKafka(吞吐量最高)

四者对比图

4. 消息队列的基本概念

前者后者生产者向消息队列发送消息服务消费者从消息队列取消息的服务队列 queue存放消息的容器,采用FIFO数据结构交换机 exchange实现消息路由,将消息分发到对应的队列中消息服务器 Broker进行消息通信的软件平台服务器虚拟主机 virtual host类似namespace ,将不同用户的交换机和队列分开来链接 connection网络连接通道 channel数据通信的通道

安装 RabbitMQ

1. Linux安装

1)安装erlang

wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

yum install epel-release

yum install erlang

2) 安装rabbitmq 目前的最新版本 支持erlang24

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.7/rabbitmq-server-3.9.7-1.el7.noarch.rpm

yum install rabbitmq-server-3.9.7-1.el7.noarch.rpm

3) 启动rabbitmq

service rabbitmq-server start

4) 启用管理工具

rabbitmq-plugins enable rabbitmq_management

5) 防火墙允许端口

firewall-cmd --permanent --add-port=15672/tcp

firewall-cmd --permanent --add-port=5672/tcp

6) 提示不能使用localhost登录,添加远程登录的用户

rabbitmqctl add_user admin admin

rabbitmqctl set_user_tags admin administrator

7) 设置开机启动输入下面命令

chkconfig rabbitmq-server on

2. Windows安装

1)下载erlang和RabbitMQ安装包 2)安装erlang 3)安装rabbit MQ 4)打开菜单输入命令,启动管理工具

rabbitmq-plugins enable rabbitmq_management

5)启动rabbit MQ

net start rabbitmq

net stop rabbitmq

6)浏览器输入: http://localhost:15672 账号密码都是guest

RabbitMQ 的基本使用

1. 添加用户

​ 不同的系统可以使用各自的用户登录RabbitMQ,可以在Admin的User页面添加新用户

2. 添加虚拟主机

​ 虚拟主机相当于一个独立的MQ服务,有自身的队列、交换机、绑定策略等。

3. 添加队列

​ 不同的消息队列保存不同类型的消息,如支付消息、秒杀消息、数据同步消息等。

​ 添加队列,需要填写虚拟主机、类型、名称、持久化、自动删除和参数等。

4. 添加交换机

​ 生产者将消息发送到交换机Exchange,再由交换机路由到一个或多个队列中; ​ 交换器的类型有fanout、direct、topic、headers这四种,下篇文章将详细介绍。

Rabbit MQ的五种消息模型

1. 一对一模型

操作步骤

启动RabbitMQ,在管理页面中创建用户admin使用admin登录,让后创建虚拟主机myhost

​ – 创建队列的配置如下:

按例代码

导入依赖

com.rabbitmq

amqp-client

3.4.1

开发工具类

public class MQUtils {

public static final String QUEUE_NAME = "myqueue01";

public static final String QUEUE_NAME2 = "myqueue02";

public static final String EXCHANGE_NAME = "myexchange01";

public static final String EXCHANGE_NAME2 = "myexchange02";

public static final String EXCHANGE_NAME3 = "myexchange03";

/**

* 获得MQ的连接

* @return

* @throws IOException

*/

public static Connection getConnection() throws IOException {

ConnectionFactory connectionFactory = new ConnectionFactory();

//配置服务器名、端口、虚拟主机名、登录账号和密码

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("myhost");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("123456");

return connectionFactory.newConnection();

}

}

开发生产者

/**

* 生产者,发送简单的消息到队列中

*/

public class SimpleProducer {

public static void main(String[] args) throws IOException {

Connection connection = MQUtils.getConnection();

//创建通道

Channel channel = connection.createChannel();

//定义队列

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

String msg = "Hello World!";

//发布消息到队列

channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());

channel.close();

connection.close();

}

}

运行生产者代码,管理页面点进myqueue01,在GetMessages中可以看到消息

2. 工作队列模型

按例代码能者多劳

3. 发布-订阅模型

操作步骤按例代码

4. 路由模型

操作步骤按例代码

5. 路由模型

操作步骤按例代码

6. 主题模型

按例代码

SpringBoot整合RabbitMQ

创建两个SpringBoot项目,一个作为生产者,一个作为消费者; ​ 生产者会发送两种消息:保存课程(更新和添加)、删除课程 ​ 消费者监听两个队列:保存课程队列和删除课程队列 给生产者和消费者服务添加依赖

org.springframework.boot

spring-boot-starter-amqp

给生产者和消费者服务添加配置

spring:

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

virtual-host: myhost

生产者的配置,用于生成消息队列和交换机

/**

* RabbitMQ的配置

*/

@Configuration

public class RabbitMQConfig {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";

public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";

public static final String KEY_COURSE_SAVE = "key.course.save";

public static final String KEY_COURSE_REMOVE = "key.course.remove";

public static final String COURSE_EXCHANGE = "edu.course.exchange";

@Bean

public Queue queueCourseSave() {

return new Queue(QUEUE_COURSE_SAVE);

}

@Bean

public Queue queueCourseRemove() {

return new Queue(QUEUE_COURSE_REMOVE);

}

@Bean

public TopicExchange topicExchange() {

return new TopicExchange(COURSE_EXCHANGE);

}

@Bean

public Binding bindCourseSave() {

return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);

}

@Bean

public Binding bindCourseRemove() {

return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);

}

}

生产者发送消息的核心代码

@Autowired

RabbitTemplate rabbitTemplate;

//发消息的代码

rabbitTemplate.convertAndSend(交换机的名称,消息的key,消息内容);

消费者添加监听器

@Slf4j

@Component

public class CourseMQListener {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";

public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";

public static final String KEY_COURSE_SAVE = "key.course.save";

public static final String KEY_COURSE_REMOVE = "key.course.remove";

public static final String COURSE_EXCHANGE = "course.exchange";

/**

* 监听课程添加操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_SAVE)})

public void receiveCourseSaveMessage(String message) {

try {

log.info("课程添加:{}",message);

} catch (Exception ex) {

ex.printStackTrace();

}

}

/**

* 监听课程删除操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_REMOVE)})

public void receiveCourseDeleteMessage(Long id) {

try {

log.info("课程删除完成:{}",id);

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

总结

消息队列是分布式系统的重要组件,起到的作用有:

解耦,生产者和消费者不需要知道对方的具体接口异步,生产者发送完消息直接结束,不需要等待消费者执行完,效率高削峰,控制高峰期消息的数量,降低服务器压力

RabbitMQ的消息模型有:

一对一,一个生产者一个队列一个消费者,一个发一个收 一对多,一个生产者一个队列多个消费者,多个消费者共享一个队列中的消息 发布订阅模式 由交换机绑定多个队列,消息分发到多个队列,每个消费者消费自己的队列中的消息 路由模式 在发布订阅模式的基础上,加入路由键,消息通过键路由到不同的队列 主题模式 在路由模式基础上,键中加入通配符,实现更加灵活的匹配 ex.printStackTrace();

}

} }

总结

* 消息队列是分布式系统的重要组件,起到的作用有:

1. 解耦,生产者和消费者不需要知道对方的具体接口

2. 异步,生产者发送完消息直接结束,不需要等待消费者执行完,效率高

3. 削峰,控制高峰期消息的数量,降低服务器压力

* RabbitMQ的消息模型有:

1. 一对一,一个生产者一个队列一个消费者,一个发一个收

2. 一对多,一个生产者一个队列多个消费者,多个消费者共享一个队列中的消息

3. 发布订阅模式

由交换机绑定多个队列,消息分发到多个队列,每个消费者消费自己的队列中的消息

4. 路由模式

在发布订阅模式的基础上,加入路由键,消息通过键路由到不同的队列

5. 主题模式

在路由模式基础上,键中加入通配符,实现更加灵活的匹配

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: