RabbitMQ

一、什么是中间件?二、中间件技术及架构的概述三、基于消息中间件的分布式系统架构1.消息中间件的应用场景2.常见的消息中间件3.消息中间件的本质及设计4.消息中间件的核心组成部分

四、消息队列协议1. 什么是协议2. AMQP协议3. MQTT协议4. OpenMessage协议5. Kafka协议

五、消息队列持久化六、消息的分发策略七、高可用&高可靠1. 高可用2. 高可靠

八、 RabbitMQ入门及安装1. 安装RabbitMQ2. Web管理界面及授权3. Docker安装RabbitMQ

九、原理讲解1. AMQP协议2. 核心组成部分

十、 RabbitMQ模式1. 简单模式2. 四种 exchage1)fanout类型(发布与订阅模式)2)Direct类型(Routing模式)3)topic类型(主题模式)4)Headers类型 【几乎不用】

3. Work模式1)轮询模式2)公平模式

十一、使用场景十二、SpringBoot整合1. fanout类型2. direct类型3. topic类型

十三、RabbitMQ高级1. 过期时间TTL2. 死信队列

十四、运行监控1. 内存控制2. 磁盘预警3. 内存换页

十五、集群搭建十六、分布式事务概念十七、分布式事务实现1. 系统间调用de事务回滚问题2. 分布式事务整体设计思路3. 解决可靠生产问题-定时重发4. 可靠消费

十八、集群配置详解附录

感谢飞哥的分享:B站视频地址

参考文章:消息队列–RabbitMq

源码gitee地址:https://gitee.com/wxawangxinan/rabbit-mq.git

一、什么是中间件?

1、简介

中间件(Middleware):是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即 中间件 = 平台 + 通信。这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与 支撑软件和实用软件区分开来。

2、举例

RMI(Remote Method Invocations, 远程调用)Load Balancing(负载均衡,将访问负荷分散到各个服务器中)Transparent Fail-over(透明的故障切换)Clustering(集群,用多个小的服务器代替大型机)Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务)Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)System Management(系统管理)Threading(多线程处理)Message-oriented Middleware面向消息的中间件(异步的调用编程)Component Life Cycle(组件的生命周期管理)Resource pooling(资源池)Security(安全)Caching(缓存) 满足中间件特性的都可以称作中间件,例如:rabbitMQ,Nginx,MySQL,Redis

为什么使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

中间件特点

为解决分布异构问题,人们提出了中间件(middleware)的概念。中间件是位于平台(硬件和操作系统)和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口和协议规范的多种实现。 很难给中间件一个严格的定义,但中间件应具有如下的一些特点: (1)满足大量应用的需要 (2)运行于多种硬件和OS平台 (3)支持分布计算,提供跨网络、硬件和OS平台的透明性的应用或服务的交互 (4)支持标准的协议 (5)支持标准的接口

由于标准接口对于可移植性和标准协议对于互操作性的重要性,中间件已成为许多标准化工作的主要部分。对于应用软件开发,中间件远比操作系统和网络服务更为重要,中间件提供的程序接口定义了一个相对稳定的高层应用环境,不管底层的计算机硬件和系统软件怎样更新换代,只要将中间件升级更新,并保持中间件对外的接口定义不变,应用软件几乎不需任何修改,从而保护了企业在应用软件开发和维护中的重大投资。

二、中间件技术及架构的概述

知识路线

学习技巧

1:理解中间件在项目架构中的作用,以及各中间件的底层实现。 2:可以使用一些类比的生活概念去理解中间件, 3:使用一些流程图或者脑图的方式去梳理各个中间件在架构中的作用 4:尝试用java技术去实现中间件的原理 5:静下来去思考中间件在项目中设计的和使用的原因

三、基于消息中间件的分布式系统架构

从上图中看,消息中间件的作用是: 1:利用可靠的消息传递机制,进行系统和系统直接的通讯 2:通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程间的通讯。

1.消息中间件的应用场景

跨系统数据传递高并发的流量削峰(将某些串行操作,改为并行操作)数据的分发和异步处理大数据分析与传递分布式事务

比如有一个数据要进行迁移或者请求并发过多的时候(10W的并发请求下订单),可以在这些订单入库之前,把订单请求堆积到消息队列中,让它稳健可靠的入库和执行。

2.常见的消息中间件

ActiveMQ、RabbitMQ、Kafka、RocketMQ等

3.消息中间件的本质及设计

它是一种 接收数据、接受请求,存储数据,发送数据 等功能的技术服务。 MQ消息队列:负责数据的接收、存储和传递,所以性能要优于普通服务和技术

4.消息中间件的核心组成部分

消息的协议消息的持久化机制消息的分发策略消息的高可用、高可靠消息的容错机制

四、消息队列协议

1. 什么是协议

消息中间件负责数据的传递、存储、和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范(采用底层的TCP/IP,UDP协议或其他协议)约定成俗的规范就是:协议

所谓协议是指: 1:计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。 2:和一般的网络应用程序不同,它主要负责数据的接受和传递,所以性能比较的高 3:计算机之间交换数据和格式,都必须严格遵守规范。

网络协议三要素:

语法:用户数据与控制信息的结构与格式,以及数据出现的顺序。语义:解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。时序:时序是对事件发生顺序的详细说明。

比如我MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的响应结果和反馈是什么,然后按照对应的执行顺序进行处理。

以http请求协议为例: 1:语法:http规定了请求报文和响应报文的格式。 2:语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是post/get请求) 3:时序:一个请求对应一个响应。(一定先有请求在有响应,这个是时序)

消息中间件并不采用http协议。常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。

面试题:为什么消息中间件不直接使用http协议呢? 1.因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、响应码等附加的功能。但是对于一个消息而言,并不需要这么复杂,也没这个必要,它其实就是负责数据传递、存储、分发就行。首先追求的是高性能,尽量简洁,快速。 2.大部分情况下http都是短链接。在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会进行持久化,造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息进行持久化等,目的是为了保证消息和数据的高可靠。

2. AMQP协议

AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个应用层高级消息队列协议,提供统一消息服务,为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。 特性: 1:分布式事务支持。 2:消息的持久化支持。 3:高性能和高可靠的消息处理优势。

AMQP协议的支持者:

3. MQTT协议

MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。 特点: 轻量、结构简单、传输快,不支持事务、没有持久化设计。 应用场景: 适用于计算能力有限、低带宽、网络不稳定的场景。

4. OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。 特点: 结构简单、解析速度快、支持事务和持久化设计

5. Kafka协议

Kafka协议是基于 TCP/IP 的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。 特点是: 结构简单、解析速度快、无事务支持、有持久化设计

五、消息队列持久化

常见的持久化方式:

持久化消息的保存过程: 非持久化消息: 非持久消息:是指当内存不够用的时候,会把消息和数据转移到磁盘,但是重启以后非持久化队列消息就丢失。 持久化分类: 1:队列持久化 2:消息持久化 3:交换机持久化

不论是持久化的消息还是非持久化的消息都可以写入到磁盘中,只不过非持久的是等内存不足的情况下才会被写入到磁盘中

持久化队列

代码创建持久化队列

// 参数1:名字

// 参数2:是否持久化[即durable=true。持久化的队列在web控制台中有一个D 的标记]

// 参数3:独占的queue,

// 参数4:不使用时是否自动删除,

// 参数5:其他参数

channel.queueDeclare(queueName,true,false,false,null);

web端创建持久化队列 【注】重启rabbit-server服务,持久化队列依然在,而非持久队列会丢失!

消息持久化 通过属性 deliveryMode 来设置消息是否持久化,在发送消息时通过basicPublish的参数传入:

// 参数1:交换机的名字

// 参数2:队列或者路由key

// 参数3:是否进行消息持久化

// 参数4:发送消息的内容

channel.basicPublish(exchangeName, routingKey1, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

交换机持久化 和队列一样,交换机也需要在定义的时候设置持久化,否则在rabbit-server服务重启以后将丢失

// 参数1:交换机的名字

// 参数2:交换机的类型,topic/direct/fanout/headers

// 参数3:是否持久化

channel.exchangeDeclare(exchangeName,exchangeType,true);

六、消息的分发策略

MQ消息队列的角色:生产者、存储消息、消费者

生产者生成消息,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎 推(push)或者 拉(pull)两种方式。典型的git就有推拉机制,http请求就是一种典型的拉取数据库数据返回的过程。消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景,也有很多对应推机制策略。

消息分发策略的机制和对比

ActiveMQRabbitMQKafkaRocketMQ发布订阅支持支持支持支持轮询分发支持支持支持/公平分发/支持支持/重发支持支持/支持消息拉取/支持支持支持

七、高可用&高可靠

1. 高可用

高可用:是指产品在规定的条件和时间内,处于可执行状态的能力 当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU、内存、磁盘)的极限,无法满足业务的需求。所以消息中间件必须支持集群部署,达到高可用的目的。

集群模式1 ---- Master-slave主从共享数据

【解说】:生产者讲消息发送到Master节点,所有的客户端都连接这个消息队列共享这块数据区域。Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用。

集群模式2 ---- Master- slave主从同步部署方式

【解说】写入消息同样在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个,就可以去不同的节点消费,因为消息的拷贝和同步会占用很大的带宽和网络资源。在后续的 RabbitMQ 中会有使用

集群模式3 ---- Master- slave主从同步部署方式

【解说】和上面的区别不大,但可以在任意节点进行写入

集群模式4 ---- Master- slave主从同步部署方式

【解说】如果你插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息(元数据信息)进行同步,如果消费者在broker-2中进行消费,几乎没有对应的消息。可以从对应的元数据信息中去查询,然后返回对应的消息信息。 场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。

集群模式5 ---- Master-slave与Breoker-cluster组合的方案

【解说】实现多主多从的模式,来完成消息的高可用以及数据的热备机制。在生产规模达到一定的阶段的时候,这种使用的频率比较高。

集群模式最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以保证消息服务继续使用。

终归三点: 要么消息共享;要么消息同步;要么元数据共享

2. 高可靠

高可用:指系统可以无故障低持续运行。比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。 在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。 如何保证中间件消息的可靠性呢?可以从两个方面考虑: 1:消息的传输:通过协议来保证系统间数据解析的正确性。 2:消息的存储可靠:通过持久化来保证消息的可靠性。

八、 RabbitMQ入门及安装

RabbitMQ官网:https://www.rabbitmq.com/

RabbitMQ is the most widely deployed open source message broker. With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. From T-Mobile to Runtastic, RabbitMQ is used worldwide at small startups and large enterprises. RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. RabbitMQ runs on many operating systems and cloud environments, and provides a wide range of developer tools for most popular languages.RabbitMQ是部署最广泛的开源消息代理。 RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-Mobile到Runtastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。 RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。 RabbitMQ在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发工具。

【概述】:RabbitMQ是一个开源的、遵循AMQP协议的、基于Erlang语言编写的消息代理,支持多种客户端(语言)。用于在分布式系统中 存储、转发消息,具有高可用、高可扩性、易用性等特征。

1. 安装RabbitMQ

官方安装包下载地址 rabbitmq下载地址:https://github.com/rabbitmq/rabbitmq-server/releases erlang下载地址:hhttps://hub.fastgit.org/rabbitmq/erlang-rpm/releases socket下载地址:http://www.rpmfind.net/linux/rpm2html/search.php?query=socat(x86-64)

1:下载地址:https://www.rabbitmq.com/download.html 2:环境准备:CentOS7.x+ / Erlang RabbitMQ是采用Erlang语言开发的。所以系统必须提供Erlang环境,先安装 Erlang

Erlang与RabbitMQ的版本对应:https://www.rabbitmq.com/which-erlang.html

安装 Erlang

下载地址:https://www.erlang-solutions.com/downloads/ 阿里云centos7.x中下载erlang。注:一定是 el7版本: 网页地址:https://packagecloud.io/rabbitmq/erlang

wget --content-disposition "https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140"

安装

rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm

测试 erl -v

安装 socat

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

【注意】centos7系统,要下载 el7 版本: socket下载地址:http://www.rpmfind.net/linux/rpm2html/search.php?query=socat(x86-64)

安装 RabbitMQ

下载网站:https://packagecloud.io/app/rabbitmq/rabbitmq-server

使用wget的方式,获取安装包

wget --content-disposition "https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.10.0-1.el7.noarch.rpm/download.rpm?distro_version_id=140"

安装

rpm -ivh rabbitmq-server-3.10.0-1.el7.noarch.rpm

启动、关停

# 启动服务>

systemctl start rabbitmq-server

# 查看服务状态>

systemctl status rabbitmq-server

# 停止服务>

systemctl stop rabbitmq-server

# 开机启动服务>

systemctl enable rabbitmq-server

端口说明

端口说明5672RabbitMQ的通讯端口25672RabbitMQ的节点间的CLI通讯端口15672RabbitMQ HTTP_API的端口,管理员用户才能访问。用于管理RabbitMQ,需要启动Management插件。1883、8883MQTT插件启动时的端口61613、61614STOMP客户端插件启用的时候的端口15674、15675基于webscoket的STOMP端口和MOTT端口

2. Web管理界面及授权

管理界面

默认情况下,rabbitmq没有安装web端插件,需要安装才可以生效

rabbitmq-plugins enable rabbitmq_management

安装完毕,重启服务

systemctl restart rabbitmq-server

开启防火墙,通过公网访问15672端口,得到页面如下:

授权用户

rabbitmq有一个默认账号和密码是:guest 。默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户:

新增用户、密码

rabbitmqctl add_user admin admin

设置权限

rabbitmqctl set_user_tags admin administrator

用户级别及权限:

角色权限none不能访问management pluginmanagement查看自己相关节点信息 列出自己可以通过AMQP登入的虚拟机 查看自己的虚拟机节点 virtual hosts的queues、exchanges和bindings信息 查看和关闭自己的channels和connections 查看有关自己的虚拟机节点virtual hosts的统计信息(包括其他用户在这个节点中的活动信息)Policymaker包含management所有权限查看和创建和删除自己的virtual hosts所属的policies和parameters信息Monitoring包含management所有权限罗列出所有的virtual hosts,包括不能登录的virtual hosts。查看其他用户的connections和channels信息查看节点级别的数据如clustering和memory使用情况查看所有的virtual hosts的全局统计信息Administrator可以创建和删除virtual hosts可以查看,创建和删除users查看创建permisssions关闭所有用户的connections

为用户添加资源权限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

使用admin登录web端页面: 命令小结

rabbitmqctl add_user 账号 密码 # 添加用户

rabbitmqctl set_user_tags 账号 administrator # 授权用户

rabbitmqctl change_password Username Newpassword # 修改密码

rabbitmqctl delete_user Username # 删除用户

rabbitmqctl list_users # 查看用户清单

# 为用户设置administrator角色

rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

3. Docker安装RabbitMQ

参考网站: 1:https://www.rabbitmq.com/download.html 2:https://registry.hub.docker.com/_/rabbitmq/

【注】rabbitmq:management 版本默认支持web端插件;rabbitmq:latest 版本提交最多,但需手动开启网页插件

# 1. 拉取镜像

docker pull rabbitmq:management

# 2. 创建并运行容器

# —hostname:指定容器主机名称

# —name:指定容器名称

# -p:将mq端口号映射到主机

# -e RABBITMQ_DEFAULT_USER= 设置用户名

# -e RABBITMQ_DEFAULT_PASS= 设置密码

docker run -d --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq

# 3. 开启网页客户端

docker exec 容器id rabbitmq-plugins enable rabbitmq_management`

九、原理讲解

1. AMQP协议

AMQP:(Advanced Message Queuing Protocol)高级消息队列协议。是应用层协议的一个开发标准,为面向消息的中间件设计

生产者与消费者流转过程如下:

为什么RabbitMQ是基于channel而不是连接的操作?

无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID。Channel 是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是多个 TCP 连接; 然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理; 每个线程把持一个信道,所有信道复用 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。 当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源;但是 信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。 信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的: 比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法; RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

2. 核心组成部分

核心概念:

Server:又称Broker。接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server Connection:连接,应用程序与Broker的网络连接 。TCP/IP 三次握手和四次挥手 Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。 Message:消息:服务与应用程序之间传送的数据,由Properties和body组成。Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性;Body则就是消息体的内容。 Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力) Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.交换机和队列之间的绑定。 Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。消费者可能有多个,队列中的消息如果想要指定推送呢?就要通过这个key来查找投递给谁。 Queue:队列,也称为消息队列(Message Queue),保存消息并将它们转发给消费者

整体架构

运行流程

十、 RabbitMQ模式

1. 简单模式

创建maven项目 导入依赖

com.rabbitmq

amqp-client

5.10.0

org.springframework.amqp

spring-amqp

2.2.5.RELEASE

org.springframework.amqp

spring-rabbit

2.2.5.RELEASE

org.springframework.boot

spring-boot-starter-amqp

2.3.0.RELEASE

编写生产者

public class Producer {

public static void main(String[] args) {

//所有的中间价技术都是基于tcp/ip协议的,并在此协议上构建的。rabbitmq遵循的是amqp协议。

//所以既然如此,必然会有ip和port

//1.创建连接工程

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("http://39.104.144.107");

connectionFactory.setPort(5672);

connectionFactory.setUsername("xinan");

connectionFactory.setPassword("xinan2");

connectionFactory.setVirtualHost("/");

Connection connection = null;

Channel channel = null;

try{

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明队列

String queueName = "queue1";

//声明队列时的五个参数

/**

* @params1:队列名称

* @params2:是否要持久化 durable=false,所谓持久化消息就是 是否会存盘,

* false:非持久化,true:持久化,非持久化会存盘吗?

* @params3:排他性,是否是一个独占队列

* @params4:最后一个消费者消费完消息之后,是否自动把队列删除

* @params5:携带一些附加的参数

*/

channel.queueDeclare(queueName,false,false,false,null);

//5.准备消息内容

String message = "Hello world";

//6.发送消息给队列

channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));

System.out.println("消息发送成功");

}

catch (Exception e){

e.printStackTrace();

}

finally {

try {

if (channel != null && channel.isOpen()) {

channel.close(); //7.关闭通道

}

if(connection != null && connection.isOpen()) {

connection.close(); //8.关闭连接

}

}

catch (Exception e){

e.printStackTrace();

}

}

}

}

执行上面代码,可以在web控制台看到 queue1 队列的信息 编写消费者代码

public class Consumer {

public static void main(String[] args) {

//1.创建连接工程

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("39.104.144.107");

connectionFactory.setPort(5672);

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("/");

Connection connection = null;

Channel channel = null;

try{

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.创建消费者

channel.basicConsume("queue1", true, new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

System.out.println("收到的消息是: " + new String(message.getBody(), "UTF-8"));

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

System.out.println("接收消息失败。。。");

}

});

System.out.println("开始接收消息");

//在这里暂停一下,相当于getchar()

System.in.read();

}

catch (Exception e){

e.printStackTrace();

}

finally {

try {

if (channel != null && channel.isOpen()) {

channel.close(); //7.关闭通道

}

if(connection != null && connection.isOpen()) {

connection.close(); //8.关闭连接

}

}

catch (Exception e){

e.printStackTrace();

}

}

}

}

测试结果

持久化队列和非持久化队列的区别

持久化队列:在rabbitMQ服务重启后,该队列仍然存在,队列中的消息也会存在非持久化队列:队列只存在于rabbitMQ的运行过程中,服务重启后,队列不会存在;队列中的消息在rabbitMQ的运行过程中会持久化,但是在rabbitMQ服务重启后,持久化的消息也会消失。

2. 四种 exchage

1)fanout类型(发布与订阅模式)

交换机为 fanout 类型,通过交换机,向他下面的队列都发送一样的消息;指定路由key是没有意义的,仍然会所有的队列都会收到消息。

通过图像化界面操作

创建交换机 创建队列 交换机与队列绑定 发布消息 Ready、Total 数量 +1

核心代码

消费者1:

// 1. 声明交换机

String exchangeName = "fanout_exchang"; // 交换机名称

String type = "fanout"; // 类型

channel.exchangeDeclare(exchangeName, type);

// 2. 声明队列:订阅者fanout_queue1

String queueName = "fanout_queue1";

// 队列名,持久化,排他性,自动删除,携带额外的参数

channel.queueDeclare(queueName, false, false, false, null);

// 3. 队列&交换机绑定:其中 routingkey(也称之为 binding key)为空字符串

channel.queueBind(queueName, exchangeName, "");

// 4. 队列消费消息

channel.basicConsume(queueName, true, DeliverCallback(),CancelCallback());

消费者2:

// 1. 声明交换机

String exchangeName = "fanout_exchang"; // 交换机名称

String type = "fanout"; // 类型

channel.exchangeDeclare(exchangeName, type);

// 2. 声明队列:订阅者fanout_queue2

String queueName = "fanout_queue2";

// 队列名,持久化,排他性,自动删除,携带额外的参数

channel.queueDeclare(queueName, false, false, false, null);

// 3. 队列&交换机绑定:其中 routingkey(也称之为 binding key)为空字符串

channel.queueBind(queueName, exchangeName, "");

// 4. 队列消费消息

channel.basicConsume(queueName, true, DeliverCallback(),CancelCallback());

生产者:

// 1. 声明交换机:名称与类型

String exchangeName = "fanout_exchang"; //交换机名称

String exchangeType = "fanout"; //交换机类型

channel.exchangeDeclare(exchangeName, exchangeType); //声明交换机

// 向fanout_exchange交换机,发布消息

String message = "hello world , fanout!!!";

channel.basicPublish("fanout_exchang", "", null, message .getBytes(StandardCharsets.UTF_8));

2)Direct类型(Routing模式)

direct类型的交换机,通过队列设置不同的 Routing key ,来接受不同的消息; 交换机在发消息的时候,通过指定的不同的 Routing key ,来转发到指定的 Routing key的队列。

消费者1

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明队列

String queueName = "routing_queue1";

channel.queueDeclare(queueName, false, false, false, null);

//5.绑定交换机与队列

String exchangeName = "direct_exchange";

String routingkey = "error";

channel.queueBind(queueName, exchangeName, routingkey);

//6.创建消费者

channel.basicConsume(queueName, true, DeliverCallback(),CancelCallback());

消费者2

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明队列

String queueName = "routing_queue2";

channel.queueDeclare(queueName, false, false, false, null);

//5.绑定交换机与队列

String exchangeName = "direct_exchange";

String routingkey = "info";

channel.queueBind(queueName, exchangeName, routingkey);

//6.消费消息

channel.basicConsume(queueName, true, DeliverCallback(),CancelCallback());

生产者

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明交换机(如web端已操作,则省略)

String exchangeName = "direct_exchange";

String type = "direct";

channel.exchangeDeclare(exchangeName, type);

//5.准备消息内容

String message = "Hello world!!!! direct";

String routingKey = "info";

//6.发送消息给队列

channel.basicPublish(exchangeName,routingKey ,null,message.getBytes(StandardCharsets.UTF_8));

message.getBytes();

3)topic类型(主题模式)

设置交换机为topic类型,通过 Routing key 模式匹配来分发队列里面的消息

* : 匹配一个单词# : 匹配 0个或多个单词

表达式含义*.orange.*前面有1级,orange,后面有1级*.*.rabbit前面有1级,中间有1级,rabbitlazy.#lazy后面可以有0个或多个,都是匹配的

生产者

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明交换机(如web端已操作,则省略)

String exchangeName = "topic_exchange";

String type = "topic";

channel.exchangeDeclare(exchangeName, type);

//5.准备消息内容

String message = "Hello world!!!! topic1"; //发送的消息

String routingkey = "com.orange.rabbit"; //routingKey

String routingkey2 = "lazy.xinan"; //routingKey

//6.发送消息给队列

channel.basicPublish(exchangeName,routingkey,null,message.getBytes(StandardCharsets.UTF_8));

消费者1

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明队列

String queueName = "topic_queue1";

channel.queueDeclare(queueName, false, false, false, null);

//5.绑定交换机与队列

String exchangeName = "topic_exchange";

String routingkey = "*.orange.*";

channel.queueBind(queueName, exchangeName, routingkey);

//6.消费消息

。。。。。。

消费者2

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明队列

String queueName = "topic_queue2";

channel.queueDeclare(queueName, false, false, false, null);

//5.绑定交换机与队列

String exchangeName = "topic_exchange";

String routingkey1 = "*.*.rabbit";

String routingkey2 = "lazy.#";

channel.queueBind(queueName, exchangeName, routingkey1);

channel.queueBind(queueName, exchangeName, routingkey2);

//6.消费消息

。。。。。。

4)Headers类型 【几乎不用】

交换机为Headers模式,队列设置不同的参数; 发送消息时,通过不同的参数,将消息转达到不同的队列

队列Arguments举例结果queue1x:1 y:1x=1queue2能收到queue2x:1x=1,y=1queue1、queue2能收到queue3x:2 y:1x=2都收不到

生产者

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明交换机(如web端已操作,则省略)

String exchangeName = "topic_exchange_header";

String type = "headers";

channel.exchangeDeclare(exchangeName, type);

//5.准备消息内容

String message = "Hello world!!!! topic_header8-----------"; //发送的消息

//6.定义 header

Map header = new HashMap();

header.put("x","1");

header.put("y","1");

AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(header);

//7.发送消息给队列

channel.basicPublish(exchangeName,"",properties.build(),message.getBytes(StandardCharsets.UTF_8));

消费者1

//4.声明队列

String queueName = "topic_header_queue1";

channel.queueDeclare(queueName, false, false, false, null);

//5.定义 header

Map header = new HashMap();

//x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。

header.put("x-match", "all");

header.put("x","1");

header.put("y","2");

//6.绑定交换机与队列

String exchangeName = "topic_exchange_header";

channel.queueBind(queueName, exchangeName,"", header); //绑定的时候,传入 header

//7.创建消费者消费消息

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(message);

}

};

//8.消费消息

channel.basicConsume(queueName, true, defaultConsumer);

消费者2

//4.声明队列

String queueName = "topic_header_queue2";

channel.queueDeclare(queueName, false, false, false, null);

//5.定义 header

Map header = new HashMap();

//x-match: all表所有key-value全部匹配才匹配成功 ,any表只需要匹配任意一个key-value 即匹配成功。

header.put("x-match", "all");

header.put("x","1");

//6.绑定交换机与队列

String exchangeName = "topic_exchange_header";

channel.queueBind(queueName, exchangeName, "",header);

//7.创建消费者消费消息

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(message);

}

};

//8.消费消息

channel.basicConsume(queueName, true, defaultConsumer);

3. Work模式

当有多个消费者时,消息会被哪个消费者消费呢?该如何均衡消费者消费信息的多少呢? 主要有两种模式: 1、轮询分发:一个消费者一条,按均分配; 2、公平分发:根据消费者的消费能力进行公平分发。处理快的处理的多,处理慢的处理的少;

1)轮询模式

特点:当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;并不会因为某个服务器的效率低,而出现不均等分发的现象。

生产者

生产者使用默认交换机,发布消息时,填写第二个参数【指明发动到哪个队列】生产者使用自定义交换机,发布消息时,不要填写第二个参数【此时代表routingKey】

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.声明交换机

String exchangeName = "work_exchange_polling";

String type = "direct";

channel.exchangeDeclare(exchangeName, type);

//5.声明队列

String queueName = "work_polling_queue";

channel.queueDeclare(queueName, false, false, false, null);

//6.绑定队列与交换机

channel.queueBind(queueName,exchangeName,"");

for (int i = 1; i <= 20; i++) {

//消息的内容

String msg = "hello: " + i;

// 7: 发送消息给中间件rabbitmq-server

// @params1: 交换机 exchange

// @params2: 队列名称 / routingkey

// 如果指定了交换机名称,第二个参数代表 routingKey;

// 如果没指定交换机名称,第二个参数代表 队列名称

// @params3: 属性配置

// @params4: 发送消息的内容

channel.basicPublish(exchangeName, "", null, msg.getBytes());

}

消费者

消费者1、2代码相同,消费的是同一个队列【queueName相同】channel.basicQos(1);,指定每次轮询,消费多少条消息

//4.要消费的队列

String queueName = "work_polling_queue";

//5.绑定交换机与队列

String exchangeName = "work_exchange_polling";

channel.queueBind(queueName, exchangeName,"");

//6.创建消费者消费消息

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(message);

}

};

//7.每次分发多少条消息

channel.basicQos(1);

//8.消费消息(轮询分发,@param2 true代表自动应答)

channel.basicConsume(queueName, true, new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

System.out.println("收到的消息是: " + new String(message.getBody(), "UTF-8"));

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

System.out.println("接收消息失败。。。");

}

});

//【模拟服务器延迟】消费者1 沉睡1000ms ; 消费者2 沉睡200ms

Thread.sleep(1000);

2)公平模式

生产者代码,与轮询分发完全一样消费者 (1) 消费者1、2代码相同,消费的是同一个队列【queueName相同】 (2) 采用手动应答的方式,每次消费一条消息

//2.创建Connection,名字叫做Producer

connection = connectionFactory.newConnection("Producer");

//3.通过连接获取通道channel

channel = connection.createChannel();

//4.要消费的队列

String queueName = "work_polling_queue";

//5.绑定交换机与队列

String exchangeName = "work_exchange_polling";

channel.queueBind(queueName, exchangeName,"");

//6.创建消费者消费消息

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println(message);

}

};

//7.定义接收消息的回调

Channel finalChannel = channel;

finalChannel.basicQos(1); //每次消费一条

//8.消费消息(第二个参数false: 采取手动应答)

channel.basicConsume(queueName, false, new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

try {

System.out.println("收到的消息是: " + new String(message.getBody(), "UTF-8"));

Thread.sleep(1000); // 两个消费者,一个沉睡1000ms,一个沉睡 200ms

// 两个参数:应答方式、是否批量消费

finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);

} catch (Exception e) {

e.printStackTrace();

}

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

System.out.println("接收消息失败。。。");

}

});

十一、使用场景

解耦、削峰、异步

1、串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。如果串行执行,只有三个任务全部完成,业务才能完成。

/**

* 串行执行代码:

* 如果有事务,任何一个发送任务出错,都可能造成事务的回滚

*/

public void makeOrder(){

// 1 :保存订单

orderService.saveOrder();

// 2: 发送短信服务

messageService.sendSMS("order");//1-2 s

// 3: 发送email服务

emailService.sendEmail("order");//1-2 s

// 4: 发送APP服务

appService.sendApp("order");

}

2、并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

/**

* 异步执行代码:

* 采用线程池的方式,同时发送三种信息

*/

public void makeOrder(){

// 1 :保存订单

orderService.saveOrder();

// 相关发送

relationMessage();

}

public void relationMessage(){

// 1:异步发送短信服务

theadpool.submit(new Callable{

public Object call(){

messageService.sendSMS("order");

}

})

// 2:异步发送email服务

theadpool.submit(new Callable{

public Object call(){

emailService.sendEmail("order");

}

})

// 3: 异步发送APP服务

theadpool.submit(new Callable{

public Object call(){

appService.sendApp("order");

}

})

}

存在问题: 1:耦合度高 2:需要自己写线程池,自己维护成本太高 3:出现了消息可能会丢失,需要你自己做消息补偿 4:如何保证消息的可靠性你自己写 5:如果服务器承载不了,你需要自己去写高可用

3、异步消息队列 好处 1:完全解耦,用MQ建立桥接 2:有独立的线程池和运行模型 3:出现了消息可能会丢失,MQ有持久化功能 4:如何保证消息的可靠性,死信队列和消息转移的等 5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。 按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

public void makeOrder(){

// 1 :保存订单

orderService.saveOrder();

// 向MQ中发送消息

rabbitTemplate.convertSend("ex","2","消息内容");

}

高内聚、低耦合

流量削峰

十二、SpringBoot整合

创建一个空项目,然后创建两个module,分别导入如下依赖:

org.springframework.boot

spring-boot-starter-amqp

配置yaml文件

server:

port: 8080 #项目端口

spring:

rabbitmq: # rabbitMq相关信息

username: admin

password: admin

virtual-host: /

host: 39.104.144.107

port: 5672

1. fanout类型

生产者

配置类:FanoutRabbitMqConfig

@Configuration

public class FanoutRabbitMqConfig {

// 1. 声明 fanout 交换机

@Bean

public FanoutExchange fanoutExchange(){

// 交换机名称、是否持久化、是否自动删除

return new FanoutExchange("fanout_order_exchange",true,false);

}

// 2. 声明队列

@Bean

public Queue smsQueue(){

return new Queue("sms.fanout.queue",true);//交换机名称,是否持久化

}

@Bean

public Queue emailQueue(){

return new Queue("email.fanout.queue",true);

}

@Bean

public Queue wechatQueue(){

return new Queue("wechat.fanout.queue",true);

}

// 3. 交换机与队列绑定

@Bean

public Binding smsBinding(){

return BindingBuilder.bind(smsQueue()).to(fanoutExchange());

}

@Bean

public Binding emailBinding(){

return BindingBuilder.bind(emailQueue()).to(fanoutExchange());

}

@Bean

public Binding wechatlBinding(){

return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());

}

}

业务处理类:OrderService

//订单生产者

@Service

public class OrderService {

@Resource

private RabbitTemplate rabbitTemplate;

/**

* 模拟用户进行商品的下单

* @param userId 用户id @param goodsId 商品id @param num 数量

*/

public void makeOrder(String userId,String goodsId, int num){

System.out.println("用户“"+userId+"”下单成功,订单id为:" + goodsId);

// MQ实现消息的转发

String exchangeName = "fanout_order_exchange";

String routingKey = "";

String msg = "用户id:"+userId+" 订单id:"+goodsId+" 商品数量:"+num;

rabbitTemplate.convertAndSend(exchangeName,routingKey,msg);

}

}

测试类代码

@Test // 生成订单

void sendOrder() {

orderService.makeOrder("xinan123","jklsdfjkkk",8);

}

消费者

消费者

//三个消费者,只是监听的队列不一样

@Service

@RabbitListener(queues = {"email.fanout.queue"})

public class FanoutEmailConsumer {

// 处理收到的消息

@RabbitHandler

public void receiveMessage(String message){

System.out.println("email fanout 接受的订单信息为:" + message);

}

}

测试结果

成功创建交换机、绑定队列 消费端成功接收消息

2. direct类型

配置类

@Configuration

public class DirectRabbitMqConfig {

// 1. 声明fanout交换机

@Bean

public DirectExchange directExchange(){

return new DirectExchange("direct_order_exchange",true,false);

}

// 2. 声明队列

@Bean

public Queue smsDirectQueue(){

return new Queue("sms.direct.queue",true) ;

}

@Bean

public Queue emailDirectQueue(){

return new Queue("email.direct.queue",true) ;

}

@Bean

public Queue wechatDirectQueue(){

return new Queue("wechat.direct.queue",true) ;

}

// 3. 交换机与队列之间的绑定

@Bean

public Binding smsDirectBinding(){

return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");

}

@Bean

public Binding emailDirectBinding(){

return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");

}

@Bean

public Binding wechatDirectBinding(){

return BindingBuilder.bind(wechatDirectQueue()).to(directExchange()).with("wechat");

}

}

生产者(在OrderService 中新增方法)

/**

* 路由模式模拟用户下单

* @param userId:用户id goodsId:商品id num:数量 routingKey:路由key

*/

public void makeOrderDirect(String userId,String goodsId, int num, String routingKey ){

String message = "订单生成成功 : " + goodsId + " ,userId : " + userId + ",goodsId : "+ goodsId + ", num : " + num;

System.out.println(message);

String exchangeName = "direct_order_exchange";

rabbitTemplate.convertAndSend(exchangeName,routingKey,message);

}

消费者

// 三个消费者监听的队列不同,代码一致:

@RabbitListener(queues = {"email.direct.queue"})

@Service

public class DirectEmailConsumer {

@RabbitHandler

public void receiveMessage(String message){

System.out.println("email direct 接受的订单信息为:" + message);

}

}

测试方法

@Test

void testDirect() {

orderService.makeOrderDirect("xinan","my sms",6,"sms");

}

结果

3. topic类型

该类型的配置,使用注解来演示: 在消费者这边,在注解@RabbitListener中,表明当前队列的参数、绑定的交换机等信息

消费者

// 使用注解配置交换机、队列、绑定关系、routingKey

// ---- 如果没有交换机、队列,则创建 ----

@Component

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),

exchange = @Exchange(value = "topic.order.exchange", type = ExchangeTypes.TOPIC),

key = "*.email.#"

))

public class EmailWechatConsumer {

@RabbitHandler

public void receiveMessage(String message){

System.out.println("email topic 接受的订单信息为:" + message);

}

}

运行消费者项目,在rabbitMq页面,看到关系绑定成功: 2. 生产者

/**

* topic 模式中用户下单

* @param userId:用户id goodsId:商品id num:数量 routingKey:路由key

*/

public void makeOrderTopic(String userId,String goodsId, int num, String routingKey ){

String orderId = UUID.randomUUID().toString();

String message = "订单生成成功 : " + orderId + " ,userId : " + userId + ",goodsId : "+ goodsId + ", num : " + num;

System.out.println(message);

// MQ实现消息的转发

String exchangeName = "topic.order.exchange";

rabbitTemplate.convertAndSend(exchangeName,routingKey,message);

}

测试方法

@Test

void testTopic(){

//userId = 12,wechat、email收到

//userId = 22,wechat、email、sms全部都会收到

orderService.makeOrderTopic("12","12",12,"com.email");

orderService.makeOrderTopic("22","22",22,"com.email.sms");

}

结果

十三、RabbitMQ高级

1. 过期时间TTL

设置消息的预期时间,只有在这个时间段之内,消息才能够被消费者接受。过了时间之后,消息就会被自动删除。 总共有两种方式设置过期时间:队列和消息。

队列设置TTL,队列中所有消息都有 相同 的过期时间;消息设置TTL,每条消息的过期时间可以不同。

如果队列和消息都设置TTL,哪个时间短,就以哪一个过期时间为准。 消息在TTL队列里面,一旦超过了设置的TTL值,就被称为dead message,会被投递到死信队列,消费者将无法再收到消息。(TTL消息过期,则移除)

队列设置TTL

配置类

@Configuration

public class TTLRabbitmqConfig {

@Bean

public DirectExchange directTtlExchange() {

return new DirectExchange("direct_ttl_exchange", true, false);

}

@Bean

public Queue directTtlQueue() {

// 设置队列参数中的的过期时间,单位毫秒,且为整型

Map args = new HashMap<>();

args.put("x-message-ttl", 5000);

return new Queue("direct_ttl_queue", true,false,false,args);

}

@Bean

public Binding directTtlBinding(){

return BindingBuilder.bind(directTtlQueue()).to(directTtlExchange()).with("ttl");

}

}

生产者业务代码

// 路由模式来测试过期时间TTL:设置队列的过期时间

// @param userId:用户id goodsId:商品id num:数量 routingKey:路由key

public void makeTtlDirect(String userId, String goodsId, int num, String routingKey) {

String orderId = UUID.randomUUID().toString();

String message = "订单生成成功 : " + orderId + " ,userId : " + userId + ",goodsId : " + goodsId + ", num : " + num;

System.out.println(message);

// MQ实现消息的转发

String exchangeName = "direct_ttl_exchange";

rabbitTemplate.convertAndSend(exchangeName, routingKey, message);

}

测试代码

@Test

void testTtlQueue(){

orderService.makeTtlDirect("12","12",12,"ttl");

}

结果:队列带有TTL标志,5秒后消息自动移除

消息设置过期时间

生产者业务代码

//在TTL队列中,单独设置消息的过期时间

// @param userId:用户id goodsId:商品id num:数量 routingKey:路由key

public void makeTtlMessageDirect(String userId, String goodsId, int num, String routingKey) {

String orderId = UUID.randomUUID().toString();

String message = "订单生成成功 : " + orderId + " ,userId : " + userId + ",goodsId : " + goodsId + ", num : " + num + routingKey;

System.out.println(message);

String exchangeName = "direct_ttl_message_exchange";

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 设置消息的过期时间、编码格式

message.getMessageProperties().setExpiration("3000");

message.getMessageProperties().setContentEncoding("UTF-8");

return message;

}

};

rabbitTemplate.convertAndSend(exchangeName, routingKey, message, messagePostProcessor);

}

测试代码

@Test

void testTtlMessageQueue(){

orderService.makeTtlMessageDirect("12","12",12,"ttl");

}

【结果】队列过期时间5s,消息TTL时间3s:3s后消息过期

2. 死信队列

DLX(Dead-Letter-Exchange)也叫 死信交换机、死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。 消息变成死信,可能是由于以下的原因:

消息被拒绝消息过期队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重发到设置的DLX上去,进而被路由到另一个队列,即死信队列。 要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

创建交换机、队列,接收过期的消息

//创建交换机、队列,和普通的交换机、队列一样

@Configuration

public class DeadRabbitmqConfig {

@Bean

public DirectExchange deadDirectExchange(){

return new DirectExchange("dead_direct_exchange",true,false);

}

@Bean

public Queue deadQueue(){

return new Queue("dead_direct_queue",true,false,false);

}

@Bean

public Binding deadBinding(){

return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead_message");

}

}

创建死心队列(普通队列设置过期时间、绑定过期后接收的交换机)

//创建死信队列(绑定死信交换机的队列)

@Configuration

public class DeadTtlQueueConfig {

@Bean

public DirectExchange deadDirectTtlExchange() {

return new DirectExchange("dead_direct_ttl_exchange", true, false);

}

@Bean

public Queue deadDirectTtlQueue() {

Map args = new HashMap<>();

args.put("x-message-ttl", 5000);

// 设置队列的存放消息的最大长度 (超过这个长度,自动转发到死信交换机)

args.put("x-max-length", 5);

// 绑定死信转发的队列 :dead_direct_exchange

args.put("x-dead-letter-exchange", "dead_direct_exchange");

// 因为死信队列是direct模式。通过 routing key,配置要发送给死信队列的哪一个队列。

// 如果是fanout模式,则不需要配置 routing key

args.put("x-dead-letter-routing-key", "dead_message");

return new Queue("dead_direct_ttl_queue", true, false, false, args);

}

@Bean

public Binding deadDirectTtlBinding() {

return BindingBuilder.bind(deadDirectTtlQueue()).to(deadDirectTtlExchange()).with("ttl");

}

}

业务处理方法、测试方法

// 测试方法

@Test

void testDeadTtlQueue(){

orderService.makeDeadTtlDirect("user123","goods666",12,"ttl");

}

// 业务处理方法(生成订单,发送到死信队列)

public void makeDeadTtlDirect(String userId, String goodsId, int num, String routingKey) {

String message = "订单生成成功 : " + goodsId + " ,userId : " + userId + ",goodsId : " + goodsId + ", num : " + num;

System.out.println(message);

// MQ实现消息的转发

String exchangeName = "dead_direct_ttl_exchange";

rabbitTemplate.convertAndSend(exchangeName, routingKey, message);

}

死信队列消费端

@Service

public class DeadConsumer {

@RabbitListener(queues = {"dead_direct_queue"})

public void messageOnDeadOrderCancelQueue(String orderMessage, Channel channel, CorrelationData correlationData,

@Header(AmqpHeaders.DELIVERY_TAG) long tags) throws IOException {

try {

System.out.println("-------- 监听到dead_direct_queue队列的消息 --------");

System.out.println("-------- 死信消息:"+ orderMessage + ",当前的时间为:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));;

// 省略业务代码。。。

channel.basicAck(tags, false); // 手动ack

} catch (IOException e) {

// 当前死信队列处理消息出错

System.out.println("当前死信队列处理消息出错, 把消息存入数据库");

System.out.println("错误信息为:" + e.getMessage());

// 丢弃消息,但是由于当前队列并未绑定死信队列,所以直接丢弃

channel.basicNack(tags,false,false);

}

}

}

测试结果

超过5s没被消费,消息转移到dead_direct_queue中 消息被消费,控制台打印:

十四、运行监控

RabbitMQ官网配置详解:https://www.rabbitmq.com/configure.html

默认情况下,RabbitMQ使用物理内存的阈值是40%。当内存使用超过阈值 or 磁盘空间剩余空间低于配置的阈值时,会发出内存警告,阻塞所有发布消息的连接,一旦警告解除(例如:服务器paging消息到硬盘或者分发消息到消费者并且确认)服务会恢复正常。

1. 内存控制

通过命令配置内存

# 命令格式

rabbitmqctl set_vm_memory_high_watermark

#例:相对值百分比(设置为内存的 60%)

rabbitmqctl set_vm_memory_high_watermark 0.6

#例:绝对值(设置为固定内存,单位 KB、MB、GB)

rabbitmqctl set_vm_memory_high_watermark absolute 50MB

为内存阈值,默认是物理内存的0.4; 通过命令行修改阈值立即生效,但在Broker重启之后失效;通过配置文件修改,重启Broker才会生效。

如下,调小了磁盘的大小来演示内存警告:

配置文件修改 找到配置文件:/xxx/xxxxxx/rabbitmq.conf

# 触发流控制的内存阈值。可以是绝对的或相对于操作系统可用的 RAM 量:

# 使用 relative 相对值进行设置fraction,建议在0.4~0.7之间,不建议超过0.7

vm_memory_high_watermark.relative = 0.6

# 使用 absolute 绝对值,单位是KB、MB、GB等。

vm_memory_high_watermark.absolute = 2 GB

2. 磁盘预警

当磁盘的剩余空间低于确定的阈值,RabbitMQ会同样的阻塞生产者。这样可以避免因非持久化的消息持续换页而耗尽磁盘空间,导致服务器崩溃。

在默认情况下,磁盘为50MB会进行预警:存储可持续化序列所在磁盘分区,还剩50MB的时候,会阻塞生产者、并停止内存消息换页到磁盘的过程。

这个阈值可以减少,但是不能完全的消除因磁盘耗尽导致的崩溃的可能性。比如在两次磁盘空间的检查间隙内,第一次检查是60MB,第二次检查1MB,就会出现警告。

命令行修改

rabbitmqctl set_disk_free_limit

rabbitmqctl set_disk_free_limit memory_limit

# disk_limit:固定单位 KB MB GB

# fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)

配置文件修改

disk_free_limit.relative = 3.0

disk_free_limit.absolute = 50mb

3. 内存换页

在某个Broker节点以及内存阻塞生产者之前,他会尝试将队列中的消息换页到磁盘以释放内存,持久化和非持久化的消息都会写入磁盘。其中,持久化的消息本身在磁盘中有一个副本,所以,在转移的过程中持久化的消息会先从内存中消除掉。

在默认情况下,内存到达阈值是50%,就会进行换页处理。

也就是说,在默认情况下,该内存的阈值为0.4情况下,当内存使用超过了0.2(0.4*0.5)时,会进行换页动作。

vm_memory_high_watermark.relative = 0.4 #内存占用阈值

vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)#换页百分比

十五、集群搭建

RabbitMq集群

RabbitMQ是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。 在实际使用过程中多采取多机多实例部署方式,但下面采取单机多实例的方式,去搭建一个rabbitmq集群。

查看服务状态

配置的前提是你的rabbitmq可以运行起来,比如 “ps aux|grep rabbitmq” 你能看到相关进程,又比如运行 “rabbitmqctl status” 可以看到运行状态

确保RabbitMQ可以运行的,然后使用命令"systemctl stop rabbitmq-server"把单机版的RabbitMQ服务停止。

单机多应用

场景:假设有两个rabbitmq节点,分别为rabbit-1(主节点)、 rabbit-2(从节点)。 启动命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached 结束命令:rabbitmqctl -n rabbit-1 stop

启动主节点 rabbit-1

[root@xinan ~] RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &

[1] 1591

[root@xinan ~]# 2023-03-10 09:49:58.273195+08:00 [info] <0.222.0> Feature flags: list of feature flags found:

...... 省略日志 ......

Starting broker... completed with 3 plugins.

启动从节点 rabbit-2 【注意】:web管理插件默认端口15672被rabbit-1占用,rabbit-2需要另外指定 RABBITMQ_SERVER_START_ARGS=”-rabbitmq_management listener [{port,15673}]”

[root@xinan ~]#sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &

查看启动状态:ps aux|grep rabbitmqrabbit-1设置为主节点

#停止应用

> sudo rabbitmqctl -n rabbit-1 stop_app

#清除节点上的历史数据(如果不清除,无法将节点加入到集群)

> sudo rabbitmqctl -n rabbit-1 reset

#启动应用

> sudo rabbitmqctl -n rabbit-1 start_app

rabbit-2设置为从节点

# 停止应用

> sudo rabbitmqctl -n rabbit-2 stop_app

# 清除节点上的历史数据(如果不清除,无法将节点加入到集群)

> sudo rabbitmqctl -n rabbit-2 reset

# 将rabbit2节点加入到rabbit1(主节点)集群当中【Server-node为服务器的主机名】

# 如果是多机部署多应用,Server-node 替换为ip地址,或通过hosts文件映射ip

> # sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'Server-node'

> sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@xinan

# 启动应用

> sudo rabbitmqctl -n rabbit-2 start_app

验证集群状态

> sudo rabbitmqctl cluster_status -n rabbit-1

给节点node-1、node-2 设置用户名和密码,赋予权限:

> rabbitmqctl -n rabbit-1 add_user admin admin

> rabbitmqctl -n rabbit-1 set_user_tags admin administrator

> rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"

> rabbitmqctl -n rabbit-2 add_user admin admin

> rabbitmqctl -n rabbit-2 set_user_tags admin administrator

> rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"

登录web控制台,查看节点信息 小结

Tips: 如果采用多机部署方式,需读取其中一个节点的cookie, 并复制到其他节点(节点之间通过cookie确定相互是否可通信) cookie存放在:/var/lib/rabbitmq/.erlang.cookie ————————————————————————————— 例如:主机名分别为rabbit-1、rabbit-2 1、逐个启动各节点 2、配置各节点的hosts文件( vim /etc/hosts) ​ ip1:rabbit-1 ​ ip2:rabbit-2 其它步骤雷同单机部署方式

十六、分布式事务概念

概述

【分布式事务】:事务的操作位于不同的节点上,需要保证事务的 AICD 特性。 例如:库存和订单如果不在同一个节点上,就涉及分布式事务。

1、分布式事务的方式

在分布式系统中,要实现分布式事务,无外乎那几种解决方案:

一、两阶段提交(2PC)

两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。需要数据库产商的支持,java组件有atomikos等。

准备阶段 协调者询问参与者事务是否执行成功,参与者发回事务执行结果。 提交阶段 如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。 【注意】:在准备阶段,参与者执行了事务,但是还未提交;只有在提交阶段,接收到协调者发来的通知后,才进行提交或者回滚。 存在的问题 1) 同步阻塞:所有事务参与者,在等待其它参与者响应的时候,都处于同步阻塞状态,无法进行其它操作。 2)单点问题:协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。 3) 数据不一致:在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。 4) 太过保守:任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

二、补偿事务(TCC) 严选、阿里、蚂蚁金服

TCC 采用补偿机制,核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

Try 阶段主要是对业务系统做检测及资源预留Confirm 阶段主要是对业务系统做确认提交。Try阶段执行成功后开始执行 Confirm阶段时,默认 - - Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用 1:首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。 2:在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。 3:如果第2步执行成功,那么转账成功;如果第2步失败,则调用对应的远程解冻方法 (Cancel)。

优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些 缺点: 缺点还是比较明显的,在2、3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

三、本地消息表(异步确保) 【使用场景】:支付宝、微信支付主动查询支付状态、对账单 本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务,来保证对这两个表的操作满足ACID,并且使用了消息队列来保证最终一致性。

在分布式事务的一方完成写业务的操作之后,向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。在分布式事务的另一方,从消息队列中读取一个消息,并执行消息中的操作。 优点: 避免了分布式事务,实现了最终一致性。 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

四、MQ 事务消息 异步场景,通用性较强,拓展性较高。

有一些第三方的MQ是支持事务消息的(比如RocketMQ),他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。 以阿里的 RocketMQ中间件为例,其思路大致为:

第一阶段Prepared消息,会拿到消息的地址; 第二阶段执行本地事务;第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。也就是说在业务方法内,要向消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ 会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。 优点: 实现了最终一致性,不需要依赖本地数据库事务。 缺点: 实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。

五、总结 分布式事务本身是一个技术难题,是没有一种完美的方案,能够应对所有场景。要根据具体业务场景去抉择。阿里RocketMQ去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等。

十七、分布式事务实现

分布式事务完整架构图 美团外卖架构图 系统与系统之间的分布式事务问题

1. 系统间调用de事务回滚问题

@Service

public class OrderService {

@Autowired

private OrderMapper orderMapper;

// 创建订单

@Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务

public void createOrder(Order orderInfo) throws Exception {

// 1: 订单信息--插入丁订单系统,订单数据库事务

orderMapper.addOrder(orderInfo);

// 2:通過Http接口发送订单信息到运单系统

String result = dispatchHttpApi(orderInfo.getOrderId());

if(!"success".equals(result)) {

throw new Exception("订单创建失败,原因是运单接口调用失败!");

}

}

/**

* 模拟http请求调用运单系统,创建运单信息

* 如果调用运单系统,连接时间超过3s、处理时间超过2s,就会触发异常

* 此时运单可能保存成功,但订单一定保存失败!!!

*/

private String dispatchHttpApi(String orderId) {

SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();

// 链接超时 > 3秒

factory.setConnectTimeout(3000);

// 处理超时 > 2秒

factory.setReadTimeout(2000);

// 发送http请求

String url = "http://localhost:9000/dispatcher/insert?orderId="+orderId;

RestTemplate restTemplate = new RestTemplate(factory);//异常

String result = restTemplate.getForObject(url, String.class);

return result;

}

}

2. 分布式事务整体设计思路

3. 解决可靠生产问题-定时重发

新建一张冗余表,设置消息发送状态(status),消息发送成功后收到回执,修改status的值。如果MQ服务器出现了异常和故障,消息是无法获取到回执信息,则 定时重发:

【原理】采用rabbitMQ的消息确认机制,在生产者将消息投递给队列后,判断队列发送过来的ACK回执,从而判断消息是否有没有可靠投递

# 开启MQ消息确认机制

spring.rabbitmq.publisher-confirm-type: correlated

// 测试方法

@Test

void mqOrder() throws Exception {

String orderId = UUID.randomUUID().toString().replace("-", "");

MqOrder mqOrder = new MqOrder(orderId, "xinan123", "买个烤肠", new Date(), 0);

mqOrderService.createOrder(mqOrder);

}

/**

* 创建订单,并把订单信息发送到Mq服务

* 1. 创建订单,冗余字段 status=0(默认没有发送成功)

* 2. 通过确认机制:

* 开启配置:spring.rabbitmq.publisher-confirm-type: correlated

* mq成功接收到消息后,会给一个确认信息;收到确认信息,将status=1

*/

public void createOrder(MqOrder orderInfo) throws Exception {

// 1: 订单信息--插入丁订单系统,订单数据库事务

mqOrderMapper.addMqOrder(orderInfo);

// 2.发送Mq消息(发送消息的时候,关联 orderId)

String orderJson = JSONObject.toJSONString(orderInfo);

rabbitTemplate.convertAndSend("fanout_order_exchange","", orderJson,

new CorrelationData(orderInfo.getOrderId()));

}

//该注解用来修饰一个非静态的void()方法,在服务器加载Servlet的时候运行

//被修饰的方法只执行一次:在构造函数之后、init()方法之前执行

@PostConstruct

public void confirmMqSend(){

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("cause:" + cause);

//如果ack为true,则表示消息已经被i收到

String orderId = correlationData.getId();

if(!ack){

System.out.println("MQ队列响应失败,orderID是:" + orderId);

return;

}

try {

boolean flag = mqOrderMapper.updateStatus(orderId, 1);

if(flag){

System.out.println("本地消息状态修改成功,消息成功推送到 MQ 服务");

}

} catch (Exception e) {

System.out.println("本地消息状态修改失败,出现异常:"+e.getMessage());

}

}

});

}

如果Mq服务宕机,则定时扫描state=0的消息,重新发送

@EnableScheduling

public class ResendMq{

@Scheduled(cron = "xxxxxx")

public void sendMessage(){

//把状态为0的消息重新查询出来,投递到MQ中

}

}

4. 可靠消费

如果在消息消费时出现异常,消费端会不停的请求MQ服务,造成死循环:

@Service

public class OrderMqConsumer {

@RabbitListener(queues = {"fanout.order.queue"})

public void orderMqConsumer(String orderMsg, Channel channel,

CorrelationData correlationData,

@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{

// 1,获取队列中的消息

System.out.println("队列中的消息为:"+orderMsg);

// 2. 转换成json对象

MqOrder mqOrder = JSONObject.parseObject(orderMsg, MqOrder.class);

// 3. 获取订单id

String orderId = mqOrder.getOrderId();

System.out.println("消费端获取到订单id: "+orderId);

// ----------------------------------

// 如果此处出现异常,消费端会不停的请求MQ服务,造成死循环,进而导致服务器压力过大、宕机

// ----------------------------------

// 4. 省略业务操作。。。。

}

}

解决方案:

控制消费者的重试次数,同时给交换机配死信队列(超过重试次数,消息转移到死信队列中)使用try catch + 手动ack使用try catch + 手动ack + 死信队列处理

控制最大重试次数

spring:

rabbitmq:

listener:

simple: # fanout类型交换机

retry:

enabled: true # 开启重试

max-attempts: 5 # 最大重试次数

initial-interval: 2000ms # 每次重试的时间间隔

/** 第一种方法:控制重试次数

* 配置文件添加如下信息:

* listener:

* simple:

* retry:

* enabled: true # 开启重试

* max-attempts: 5 # 最大重试次数

* initial-interval: 2000ms # 每次重试的时间间隔

*/

@RabbitListener(queues = {"fanout.order.queue"})

public void orderMqConsumer(String orderMsg, Channel channel,

CorrelationData correlationData,

@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{

// 1,获取队列中的消息

System.out.println("队列中的消息为:"+orderMsg);

// 2. 转换成json对象

MqOrder mqOrder = JSONObject.parseObject(orderMsg, MqOrder.class);

// 3. 获取订单id

String orderId = mqOrder.getOrderId();

System.out.println("消费端获取到订单id: "+orderId);

// ----------------------------------

// 如果此处出现异常,消费端会不停的请求MQ服务,造成死循环,进而导致服务器压力过大、宕机

System.out.println(1/0);

// ----------------------------------

// 4. 省略业务操作。。。。

}

手动ACK + try/catch + 死信队列

rabbitmq:

port: 5672

listener:

simple:

acknowledge-mode: manual # 开启手动ACK,让程序控制MQ消息的重发、删除、转移

/** 第二种方法:添加try/catch + 手动ACK

* 添加如下配置:

* listener:

* simple:

* acknowledge-mode: manual # 开启手动ACK,让程序控制MQ消息的重发、删除、转移

*/

@RabbitListener(queues = {"fanout.order.queue"})

public void orderMqConsumer2(String orderMsg, Channel channel,

CorrelationData correlationData,

@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{

try {

// 1,获取队列中的消息

System.out.println("队列中的消息为:"+orderMsg);

// ----------------如果此处出现异常------------------

System.out.println(1/0);

// ----------------------------------

// 2. 省略业务操作。。。。

} catch (Exception e) {

/* 如果出现异常,根据实际情况去进行重发;

重发一次后,根据自己业务场景,决定消息 丢失 or 日记 ;

参数1:消息的tag 参数2 :是否多条处理

参数3: requeue 重发

false (不会重发) 会把消息转移到死信队列

true (消息重发) 建议如果使用true,不要加 try/catch; 否则会造成死循环

*/

channel.basicNack(tag,false,false);

}

}

// -------------------- 创建queue的时候,绑定死信交换机 -------------------

// 如果出现异常或超过长度,则转移到 死信交换机--死信队列

@Bean

public Queue orderQueue(){

Map args = new HashMap<>();

// 绑定死信交换机 :dead_direct_exchange

args.put("x-dead-letter-exchange", "dead_direct_exchange");

// 因为死信队列是direct模式。通过 routing key,配置要发送给死信队列的哪一个队列。

args.put("x-dead-letter-routing-key", "dead_message");

return new Queue("fanout.order.queue",true,false,false,args);//交换机名称,是否持久化

}

// ------------------- 消费死信队列:-------------------

// 上面的消息消费失败后,会转移到这儿:

@RabbitListener(queues = {"dead_direct_queue"})

public void orderDeadMqConsumer(String orderMsg, Channel channel,

CorrelationData correlationData,

@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{

try {

// 1, 处理相关业务

// 2----------------如果此处出现异常------------------

System.out.println("进入死信队列消费");

System.out.println(1/0);

// ----------------------------------

} catch (Exception e) {

// 死信队列如果还出现异常,则进行特殊处理:

// 比如发短信、发邮件、存入数据库等

}

}

十八、集群配置详解

参考文章:配置详解

rabbitmq:

addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)

# port:

##集群配置 addresses之间用逗号隔开

# addresses: ip:port,ip:port

password: admin

username: 123456

virtual-host: / # 连接到rabbitMQ的vhost

requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s

publisher-confirms: #是否启用 发布确认

publisher-reurns: # 是否启用发布返回

connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时

cache:

channel.size: # 缓存中保持的channel数量

channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel

connection.size: # 缓存的连接数,只有是CONNECTION模式时生效

connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION

listener:

simple.auto-startup: # 是否启动时自动启动容器

simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto

simple.concurrency: # 最小的消费者数量

simple.max-concurrency: # 最大的消费者数量

simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.

simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.

simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)

simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒

simple.retry.enabled: # 监听重试是否可用

simple.retry.max-attempts: # 最大重试次数

simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔

simple.retry.multiplier: # 应用于上一重试间隔的乘数

simple.retry.max-interval: # 最大重试时间间隔

simple.retry.stateless: # 重试是有状态or无状态

template:

mandatory: # 启用强制信息;默认false

receive-timeout: # receive() 操作的超时时间

reply-timeout: # sendAndReceive() 操作的超时时间

retry.enabled: # 发送重试是否可用

retry.max-attempts: # 最大重试次数

retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔

retry.multiplier: # 应用于上一重试间隔的乘数

retry.max-interval: #最大重试时间间隔

附录

消费者在接受消息的时候,出现异常,会导致什么样的问题?应该怎么去进行处理?

【问题】:导致死循环,服务的消息重试投递【思路】:

控制重发的次数 + 死信队列 设置了重试的次数,如果已达到最大的重试次数且没有成功处理,MQ会将消息进行抛弃。 为了避免消息丢失,我们就需要把消息放入到死信队列中。try+catch+手动ack+死信队列 这里有个注意点就是,使用了 try+catch 且 yaml 配置了重试次数:

basicNack() 里面参数 requeue 为 true 还是会一直重发。basicNack() 里面参数 requeue 为 false 的话,就不会再根据yaml里面配置的重试次数进行重试,也就是对yaml里面配置的全局重试次数进行覆盖,即:单个配置冲掉全局配置。

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式