文章目录

一、rabbitmq配置1.1 环境变量1.1.1 默认内置规则1.1.2 Shell环境变量1.1.3 常见环境变量

1.2 配置文件1.2.1 rabbitmq.conf文件下载流程1.2.2 常用配置项1.2.2.1 网络相关1.2.2.2 访问策略1.2.2.3 虚拟空间设置1.2.2.4 网络协议相关1.2.2.5 资源流量限制相关1.2.2.6 集群相关1.2.2.7 数据收集参数1.2.2.8 管理相关1.2.2.9 配置加密

1.3 参数策略1.3.1 常见Parameter命令用法1.3.1.1 设置参数1.3.1.1.1 set_parameter设置(绑定vhost)1.3.1.1.2 set_global_parameter设置(不绑定vhost)

1.3.1.2 查看参数1.3.1.2.1 list_parameters查看(绑定vhost)1.3.1.2.2 list_global_parameters查看(不绑定vhost)

1.3.1.3 清除参数1.3.1.3.1 clear_parameter清除(绑定vhost)1.3.1.3.2 clear_global_parameter清除(不绑定vhost)

1.3.2 特殊Parameter命令用法——Policy1.3.2.1 设置Policy1.3.2.2 查看Policy1.3.2.3 清除Policy

二、Federation插件2.1 Federation的使用场景2.2 联邦交换器2.2.1 原理机制2.2.2 异地均摊消费

2.3 联邦队列2.3.1 原理机制2.3.2 特性一(联邦队列间无限次转发)2.3.3 特性二(存有阻塞)2.3.4 特性三(不具备传递性)

2.4 Federation的使用2.4.1 查看两边需要通信的交换器/队列2.4.2 开启Federation插件2.4.3 定义Federation upstream2.4.4 定义Policy2.4.5 查看结果2.4.6 测试效果2.4.6.1 上游信息同步给本地2.4.6.2 本地新建policy并在上游生成交换器和队列2.6.4.2.1 数据单向传递2.4.6.2.2 上游交换器里的数据传递给本地

一、rabbitmq配置

从前面的操作来看,我们并没有用到配置相关的东西,比如修改配置文件?比如运行时指定某某参数?其实,大多数情况下我们默认启动rabbitmq服务时就是用的默认参数,并且也够用了。但是对于一名合格的运维来说,需要全面掌握每一个细节,才能更加高效的操作rabbitmq。当然也可以从系统调优层面上来调节系统范围内的参数来达到定制化的需求。RabbitMQ三种定制化服务方式:

环境变量(Enviroment Variables )。RabbitMQ服务端参数可以通过环境变量进行配置,比如服务节点名称、RabbitMQ配置文件地址、节点内部通信端口等。配置文件(Configuration File)。可以定义RabbitMQ服务和插件设置,比如TCP监听端口、以及其他网络相关设置、内存限制、磁盘限制等。运行时指定参数和策略 (Runtime Parameters and Policies )。可以在运行时定义集群层面的服务设置。

1.1 环境变量

RabbitMQ的环境变量都是以"RABBITMQ_”开头的。之前有rabbitmg-env.conf文件也可以设置,不过现在高版本没有了,大部分情况下都是直接在 Shell 环境中设置,所以不做过多介绍。变量优先级: Shell 环境变量 > rabbitmg-env.conf 配置文件 > 默认内置规则。

1.1.1 默认内置规则

当shell环境没有定义RABBITMQ_NODENAME时,则直接采用默认规则“rabbit@hostname”。

[root@node1 ~]# rabbitmq-server -detached

[root@node1 rabbitmq]# rabbitmqctl status|grep name

Node name: rabbit@node1

1.1.2 Shell环境变量

当配置了shell环境变量时,则最优先读取shell里的设置变量。

#比如设置节点名称为baimu。

[root@node1 rabbitmq]# tail -1 /etc/profile

export RABBITMQ_NODENAME=baimu

[root@node1 ~]# rabbitmqctl status|grep name

Node name: baimu@node1

1.1.3 常见环境变量

一般情况下不建议随意更改变量,除非像集群里设寄到多个节点时,需要指定日志等文件的存放路径。

变量名称描述RABBITMQ_NODE_IP_ADDRESS绑定某个特定的网络接口。默认值是空字符串,即绑定到所有网络接口上。如果要绑定两个或者更多的网络接口,可以参考 rabbitmq.config 中的 tcp_listeners 配置 。RABBITMQ_NODE_PORT监听客户端连接的端口号,默认为 5672RABBITMQ_DIST_PORTRabbitMQ节点内部通信的端口号,默认值为:RABBITMQ_NODE_PORT+20000,即25672。如果设置了 kernel.inet_dist_listen_min 或者 kernel.inect_dist_listen_max时,此环境变量将被忽略。RABBITMQ_NODENAMERabbitMQ的节点名称,默认为 rabbit@SHOSTNAME。在每个 Erlang 节点和机器的组合中,节点名称必须唯一。RABBITMQ_CONF_ENV_FILERabbitMQ 环境变量的配置文件(rabbitmg-env.conf)的地址,默认值为$RABBITMQ_HOME/etc/rabbitmg/rabbitmq-env.conf注意这里与 RabbitMQ配置文件 rabbitmg.config 的区别。RABBITMQ_USE_LONGNAME如果当前的 hostname 为 nodel.longname,那么默认情况下创建的节点名称为rabbit@node1,将此参数设置为 true 时,创建的节点名称就为rabbit@nodel.longname,即使用了长名称命名。默认值为空。RABBITMQ_CONFIG_FILERabbitMQ 配置文件 (rabbitmq.config)的路径,注意没有“.config”的后缀。默认值为$RABBITMQ_HOME/etc/rabbitmq/rabbitmqRABBITMQ_MNESIA BASERABBITMQ_MNESIA_DIR 的父目录。除非明确设置了 RABBITMQ_MNESIA_DIR目录,否则每个节点都应该配置这个环境变量。默认值为SRABBITMQ_HOME/var/lib/rabbitmg/mnesia,注意对于 RabbitMQ的操作用户来说,需要有对当前目录可读、可写、可创建文件及子目录的权限。RABBITMQ_MNESIA_DIR包含 RabbitMQ服务节点的数据库、数据存储及集群状态等目录,默认值为$RABBITMQ_MNESIA_BASE/ $RABBITMQ_NODENAME。RABBITMQ_LOG_BASERabbitMQ服务日志所在基础目录。默认值为$RABBITMQ_HOME/var/log/rabbitma。RABBITMQ_LOGSRabbitMQ服务与 Erlang 相关的日志,默认值为:$RABBITMQ_LOG_BASE/ $RABBITMQ_NODENAME.Iog。RABBITMQ_SASL_LOGSRabbitMQ 服务于 Erlang 的 SASL(Sstem Application Support Libraries)相关的日志默认值为SRABBITMQ_LOG_BASE/$RABBITMQ_NODENAME-Sasl.log。RABBITMQ_PLUGINS_DIR插件所在路径。默认值为SRABBITMQ_HOME/plugins

1.2 配置文件

配置文件的位置取决于不同的操作系统和安装包,二进制安装的rabbitmq默认没有配置文件,如有需要可以在官网下载。可以通过rabbitmq日志查看配置文件位置。

1.2.1 rabbitmq.conf文件下载流程

进入rabbitmq官网,查看rabbitmq.conf文件示例。 会直接跳转到github对应下载地址上。下载地址

3. 将配置文件内容复制到本地,放在本机rabbitmq安装地址目录下的etc/rabbitmq/rabbitmq.conf。

1.2.2 常用配置项

1.2.2.1 网络相关

## Networking

## ====================

1. 默认监听端口,用于业务对接的接口,一般和第三方业务对接需要提供该端口。

# listeners.tcp.default = 5672

2. 监听本地ipv4格式的特地接口。

# listeners.tcp.local = 127.0.0.1:5672

3. 监听本地ipv6格式的特地接口。

# listeners.tcp.local_v6 = ::1:5672

4. 做监听器使用的端口及其需要监听的IP地址参数。

# listeners.tcp.other_port = 5673

# listeners.tcp.other_ip = 10.10.10.10:5672

5. ssl登陆的关键配置,后续客户端要连接5671端口。

# listeners.ssl.default = 5671

6. 禁用常规TCP(非tls)侦听器参数。客户未配置为使用TLS,且正确启用TLS的端口将无法使用连接到此节点。

# listeners.tcp = none

7. 控制TCP和TLS监听器连接Erlang进程数量。

# num_acceptors.tcp = 10

# num_acceptors.ssl = 10

8. 套接字写入器将强制GC每次传输的字节数,默认为1 GiB(' 1000000000 ')。

# socket_writer.gc_threshold = 1000000000

9. 设置为:disable,代表禁用。

# socket_writer.gc_threshold = off

10. AMQP 0-9-1 和 AMQP 1.0握手允许的最大时间量。

# handshake_timeout = 10000

11. 在接受连接时执行反向DNS查找,然后rabbitmqctl和管理UI将显示主机名,而不是IP地址。

# reverse_dns_lookups = false

1.2.2.2 访问策略

## Security, Access Control

## ==============

1. 默认“guest”用户只允许通过环回接口(例如localhost)访问服务器。

# loopback_users.guest = true

2. 当设置为false时,则其他电脑连接。

# loopback_users.guest = false

3. verify_peer参数表示要求验证对方证书。设置成false表示完全忽略验证证书的结果。

# ssl_options.verify = verify_peer

# ssl_options.fail_if_no_peer_cert = false

4. 服务端私钥和证书文件,不要修改。

# ssl_options.cacertfile = /path/to/cacert.pem

# ssl_options.certfile = /path/to/cert.pem

# ssl_options.keyfile = /path/to/key.pem

# ssl_options.honor_cipher_order = true

# ssl_options.honor_ecc_order = true

5. 推荐使用TLSv1.2,但不能使用TLSv1.3。如果启用TLSv1.3,需要删除这两行。(晦涩难懂,不能随意更改)

# ssl_options.client_renegotiation = false

# ssl_options.secure_renegotiate = true

6. 最前沿的TLS版本,需要最近的客户端运行时版本,并且没有与早期TLS版本相同的密码套件。(晦涩难懂,不能随意更改)

# ssl_options.versions.1 = tlsv1.3

7. 启用TLSv1.2以获得最佳兼容性。(晦涩难懂,不能随意更改)

# ssl_options.versions.2 = tlsv1.2

8. 限制服务器将为客户端TLS使用的密码套件连接,缩小范围可以阻止一些客户连接。如果启用TLSv1.3,密码套件被覆盖,TLSv1.3特定密码套件也必须显式启用。

只使用TLSv1.3密码套件示例:

# ssl_options.ciphers.1 = TLS_AES_256_GCM_SHA384

# ssl_options.ciphers.2 = TLS_AES_128_GCM_SHA256

# ssl_options.ciphers.3 = TLS_CHACHA20_POLY1305_SHA256

# ssl_options.ciphers.4 = TLS_AES_128_CCM_SHA256

# ssl_options.ciphers.5 = TLS_AES_128_CCM_8_SHA256

# ssl_options.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384

# ssl_options.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384

# ssl_options.ciphers.3 = ECDHE-ECDSA-AES256-SHA384

# ssl_options.ciphers.4 = ECDHE-RSA-AES256-SHA384

# ssl_options.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384

# ssl_options.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384

# ssl_options.ciphers.7 = ECDH-ECDSA-AES256-SHA384

# ssl_options.ciphers.8 = ECDH-RSA-AES256-SHA384

# ssl_options.ciphers.9 = DHE-RSA-AES256-GCM-SHA384

# ssl_options.ciphers.10 = DHE-DSS-AES256-GCM-SHA384

# ssl_options.ciphers.11 = DHE-RSA-AES256-SHA256

# ssl_options.ciphers.12 = DHE-DSS-AES256-SHA256

# ssl_options.ciphers.13 = ECDHE-ECDSA-AES128-GCM-SHA256

# ssl_options.ciphers.14 = ECDHE-RSA-AES128-GCM-SHA256

# ssl_options.ciphers.15 = ECDHE-ECDSA-AES128-SHA256

# ssl_options.ciphers.16 = ECDHE-RSA-AES128-SHA256

# ssl_options.ciphers.17 = ECDH-ECDSA-AES128-GCM-SHA256

# ssl_options.ciphers.18 = ECDH-RSA-AES128-GCM-SHA256

# ssl_options.ciphers.19 = ECDH-ECDSA-AES128-SHA256

# ssl_options.ciphers.20 = ECDH-RSA-AES128-SHA256

# ssl_options.ciphers.21 = DHE-RSA-AES128-GCM-SHA256

# ssl_options.ciphers.22 = DHE-DSS-AES128-GCM-SHA256

# ssl_options.ciphers.23 = DHE-RSA-AES128-SHA256

# ssl_options.ciphers.24 = DHE-DSS-AES128-SHA256

# ssl_options.ciphers.25 = ECDHE-ECDSA-AES256-SHA

# ssl_options.ciphers.26 = ECDHE-RSA-AES256-SHA

# ssl_options.ciphers.27 = DHE-RSA-AES256-SHA

# ssl_options.ciphers.28 = DHE-DSS-AES256-SHA

# ssl_options.ciphers.29 = ECDH-ECDSA-AES256-SHA

# ssl_options.ciphers.30 = ECDH-RSA-AES256-SHA

# ssl_options.ciphers.31 = ECDHE-ECDSA-AES128-SHA

# ssl_options.ciphers.32 = ECDHE-RSA-AES128-SHA

# ssl_options.ciphers.33 = DHE-RSA-AES128-SHA

# ssl_options.ciphers.34 = DHE-DSS-AES128-SHA

# ssl_options.ciphers.35 = ECDH-ECDSA-AES128-SHA

# ssl_options.ciphers.36 = ECDH-RSA-AES128-SHA

1.2.2.3 虚拟空间设置

## Default User / VHost

## ====================

1. 第一次启动RabbitMQ将创建一个vhost和一个用户及密码。

# default_vhost = /

# default_user = guest

# default_pass = guest

2. 指定用户对应权限,可配置、可读、可写。

# default_permissions.configure = .*

# default_permissions.read = .*

# default_permissions.write = .*

3. 是否开启用户角色标签。

# default_user_tags.administrator = true

4. 开启定义以他用户角色标签。

# default_user_tags.management = true

# default_user_tags.custom_tag = true

1.2.2.4 网络协议相关

## Additional network and protocol related configuration

## =====================================================

##

1. 设置服务器AMQP 0-9-1心跳超时时间,单位为秒。

## RabbitMQ节点将发送心跳帧(timeout / 2)时间间隔。两次错过的心跳客户端将关闭连接。

## 低于6秒的值很可能产生假阳性,不建议使用。

# heartbeat = 60

2. 设置AMQP帧的最大允许大小(以字节为单位)。

# frame_max = 131072

3. 设置服务器在连接前接受的最大帧大小,调优参数。

# initial_frame_max = 4096

4. 设置每个连接允许的最大通道数,0表示“无限制”。

# channel_max = 128

5. 设置队列未接受连接的最大数目。当达到此值时,新连接会被拒绝。

# tcp_listen_options.backlog = 128

6. 设置为ture,则禁用Nagle算法,默认设置为true,也推荐设置为true。

# tcp_listen_options.nodelay = true

# tcp_listen_options.exit_on_close = false

7. 设置为ture,则启用TCP的存活时间。

# tcp_listen_options.keepalive = true

# tcp_listen_options.send_timeout = 15000

8. 设置TCP缓冲区大小。

# tcp_listen_options.buffer = 196608

# tcp_listen_options.sndbuf = 196608

# tcp_listen_options.recbuf = 196608

1.2.2.5 资源流量限制相关

## Resource Limits & Flow Control

## ==============================

1. 设置基于内存的流量控制阈值。

# vm_memory_high_watermark.relative = 0.4

2. 设置节点使用的RAM的限制(字节)。

# vm_memory_high_watermark.absolute = 1073741824

3. 从Rabbitmq 3.6.0版本开始,你可以使用内存单位设置绝对值。

# vm_memory_high_watermark。absolute = 2GB

##支持的单元符号:

## k, kiB: kibibytes (2^10 - 1,024 bytes)

## M, MiB: mebibytes (2^20 - 1,048,576 bytes)

## G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)

## kB: kilobytes (10^3 - 1,000 bytes)

## MB: megabytes (10^6 - 1,000,000 bytes)

## GB: gigabytes (10^9 - 1,000,000,000 bytes)

4. 设置队列开始的高水位限制的百分比,将页消息输出到磁盘以释放内存。

##例如,当vm_memory_high_watermark设置为0.4,该值设置为0.5时,分页可以在节点使用总可用RAM的20%时就开始。大于1.0的值可能是危险的,应谨慎使用。

##一个替代方案是使用持久队列并发布消息作为持久的(传递模式= 2)更快地将消息移动到磁盘。另一种替代方法是配置队列来分页所有消息(两者都可以)持久和瞬时到磁盘。

# vm_memory_high_watermark_paging_ratio = 0.5

5. 选择Erlang虚拟机内存消耗计算策略。(' allocated '、 ' rss '、' legacy '(别名为' erlang ')。' rss '是3.6.12时的默认值。

# vm_memory_calculation_strategy = rss

6. 设置执行内存检查的时间间隔(以毫秒为单位)

# memory_monitor_interval = 2500

7. 设置可用的总内存可以从操作系统资源中计算。可以时默认选项,也可以作为配置参数提供。

# total_memory_available_override_value = 2GB

8. 设置磁盘空闲限制(字节)。

# disk_free_limit.absolute = 50000

9. 从RabbitMQ 3.6.0版本开始,自定义设置磁盘空闲限制大小。

# disk_free_limit.absolute = 500KB

# disk_free_limit.absolute = 50mb

# disk_free_limit.absolute = 5GB

10. 设置一个相对于总可用RAM的限制,低于1.0比较危险,谨慎使用。

# disk_free_limit.relative = 2.0

1.2.2.6 集群相关

## Clustering

## =====================

1. 设置网络分区处理策略,一共有四种(ignore模式、pause-minority模式、pause-if-all-down模式、autoheal模式),默认为ignore模式。

# cluster_partition_handling = ignore

2. 设置暂停分区少数部分的所有节点。集群节点数必须为奇数。

# cluster_partition_handling = pause_minority

3. 若设置为pause_if_all_down模式策略,还需要额外配置。

# cluster_partition_handling = pause_if_all_down

4. 恢复策略。可以是autoheal,或者是ignore。

# cluster_partition_handling.pause_if_all_down.recover = ignore

5. 设置集群节点名称。

# cluster_partition_handling.pause_if_all_down.nodes.1 = rabbit@localhost

# cluster_partition_handling.pause_if_all_down.nodes.2 = hare@localhost

6. 从RabbitMQ 3.6.0版本开始,可以设置镜像同步批处理大小,但总批大小(字节)不得超过2 GiB。

# mirroring_sync_batch_size = 4096

7. 集群首次启动设置。

# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config

# cluster_formation.classic_config.nodes.1 = rabbit1@hostname

# cluster_formation.classic_config.nodes.2 = rabbit2@hostname

# cluster_formation.classic_config.nodes.3 = rabbit3@hostname

# cluster_formation.classic_config.nodes.4 = rabbit4@hostname

# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_dns

# cluster_formation.dns.hostname = discovery.eng.example.local

8. 配置节点类型。默认使用'disc'。

# cluster_formation.node_type = disc

9. 设置发送keepalive消息的间隔(以毫秒为单位)到其他集群成员。

# cluster_keepalive_interval = 10000

1.2.2.7 数据收集参数

## Statistics Collection

## =====================

1. 设置统计信息收集间隔(单位:毫秒)

# collect_statistics_interval = 5000

1.2.2.8 管理相关

# Management section

# =======================================

1. 管理页面端口,绑定可以支持tcp协议的IP。

# management.tcp.port = 15672

# management.tcp.ip = 0.0.0.0

2. web页面管理相关参数。

# management.tcp.shutdown_timeout = 7000

# management.tcp.max_keepalive = 120

# management.tcp.idle_timeout = 120

# management.tcp.inactivity_timeout = 120

# management.tcp.request_timeout = 120

# management.tcp.compress = true

3. HTTP监听器和嵌入式Web服务器设置。

# management.ssl.port = 15671

# management.ssl.cacertfile = /path/to/ca_certificate.pem

# management.ssl.certfile = /path/to/server_certificate.pem

# management.ssl.keyfile = /path/to/server_key.pem

4. 更多TLS选项

# management.ssl.honor_cipher_order = true

# management.ssl.honor_ecc_order = true

# management.ssl.client_renegotiation = false

# management.ssl.secure_renegotiate = true

## Supported TLS versions

# management.ssl.versions.1 = tlsv1.2

5. 服务器允许使用的密码套件:

# management.ssl.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384

# management.ssl.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384

# management.ssl.ciphers.3 = ECDHE-ECDSA-AES256-SHA384

# management.ssl.ciphers.4 = ECDHE-RSA-AES256-SHA384

# management.ssl.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384

# management.ssl.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384

# management.ssl.ciphers.7 = ECDH-ECDSA-AES256-SHA384

# management.ssl.ciphers.8 = ECDH-RSA-AES256-SHA384

# management.ssl.ciphers.9 = DHE-RSA-AES256-GCM-SHA384

6. HTTP API和管理UI的URL路径前缀。

# management.path_prefix = /a-prefix

7. 设置聚合数据的长度(例如消息速率和队列)长度被保留。(分、时、日)

# management.sample_retention_policies.global.minute = 5

# management.sample_retention_policies.global.hour = 60

# management.sample_retention_policies.global.day = 1200

# management.sample_retention_policies.basic.minute = 5

# management.sample_retention_policies.basic.hour = 60

# management.sample_retention_policies.detailed.10 = 5

1.2.2.9 配置加密

配置文件中有一些敏感的配置项可以被加密,然后在 RabbitMQ 启动时可以对这些项进行解密。对这些项进行加密并不是意味着系统的安全性增强了,而是遵从一些必要的规范,让一些敏感的数据不会出现在文本形式的配置文件中。在配置文件中将加密之后的值以“{encrypted,加密的值)”形式包裹,比如下面的示例中使用口令“adjhsajk”将密码“guest”加密。默认情况下,加密机制 PBKDF2 用来从口令中派生出密钥。默认的 Hash 算法是 SHA512默认的迭代次数是 1000默认的加密算法为 AES 256 CBC

我们也可以通过encode的其他参数来自定义。

rabbitmgctl encode 的完整命令为: rabbitmqctl encode [ --decode ] [Kvalue>] [ < passphrase > ) [–list-ciphers]–list-hashes) --cipher < cipher >] --hash < hash > --iterations < iterations >

[–list-ciphers]:罗列当前 RabbitMQ所支持的加密算法。[–list-hashes]:罗列当前 RabbitMQ所支持的Hash 算法。

1.3 参数策略

当我们在使用某项配置不需要同步到集群中的其他节点中,或者某项配置需要在运行时更改,这时候就不适合通过修改rabbitmq.conf配置文件来玩。因为 rabbitmq.config文件需要重启 Broker 才能生效。这种类型的配置在 RabbitMQ中的另一种称呼为,参数 (Parameter),也称运行时参数(Runtime Parameter)。Parameter 可以通过 rabbitmqctl 工具或者 RabbitMQ Management 插件提供的 HTTPAPI接口来设置。RabbitMQ 中一共有两种类型的 Parameter。vhost 级别的 Parameter 和 global 级别的 Parameter。不管是 vhost 级别还是 global 级别的参数,其所对应的值都是JSON 类型的。

vhost 级别的 Parameter 由一个组件名称 (component name)、名称 (name) 和值(value) 组成,global 级别的参数由一个名称和值组成。 比如,Federation upstream 是一个 vhost 级别的Parameter,它用来定义 Federation link 的上游信息,其对应的 Parameter 的组件名称为“federation-upstream”,名称对应于其自身的名称,而值对应于与上游的相关的连接参数等:对于Shovel而言也可以通过 Parameter 设置,其对应组件名称为“shovel”。vhost级别的参数对应的rabbitmqctl相关命令有三种:

set_parameterlist_parametersclear_parameter

1.3.1 常见Parameter命令用法

1.3.1.1 设置参数

1.3.1.1.1 set_parameter设置(绑定vhost)

命令格式:rabbitmqctl set_parameter [-p vhost] {component_name} {name} {valuel}

{component_name}:表示待设置参数的组件名称。{name}: 表示待设置的参数名称。{value}:表示待设置的参数值,是一个JSON项。

开启rabbitmq_federation插件。

[root@node1 rabbitmq]# rabbitmq-plugins enable rabbitmq_federation

将默认vhost “/”的federation-upstream组件的参数f1的值设置为JSON项。

[root@node1 rabbitmq]# rabbitmqctl set_parameter federation-upstream f1 '{"uri":"amqp://qingjun:citms@192.168.130.128:5672","ack-mode":"on-confirm"}'

1.3.1.1.2 set_global_parameter设置(不绑定vhost)

命令格式:rabbitmqctl set_global_parameter {name} {value}

类似set_parameter,但此处的key-value不绑定vhost。

[root@node1 rabbitmq]# rabbitmqctl set_global_parameter mqtt_default_vhosts '{"0=client,CN=qingjun":"/"}'

1.3.1.2 查看参数

1.3.1.2.1 list_parameters查看(绑定vhost)

命令格式:rabbitmqctl list_parameters [-p vhost]

[root@node1 rabbitmq]# rabbitmqctl list_parameters -p /

1.3.1.2.2 list_global_parameters查看(不绑定vhost)

命令格式:rabbitmqctl list_global_parameters

类似于 list_parameters,但是该命令不绑定于任何vhost

[root@node1 rabbitmq]# rabbitmqctl list_global_parameters

Listing global runtime parameters ...

name value

mqtt_default_vhosts {"0=client,CN=qingjun":"/"}

internal_cluster_id "rabbitmq-cluster-id-RGju8hDGnxgSBG2VBR0j7A"

1.3.1.3 清除参数

1.3.1.3.1 clear_parameter清除(绑定vhost)

命令格式:rabbitmqctl clear_parameter [-p vhost] {componenet_name} {key}

{componenet_name}:表示待清理的组件名称。{key}:表示待清理的参数名称。

[root@node1 rabbitmq]# rabbitmqctl clear_parameter -p / federation-upstream f1

1.3.1.3.2 clear_global_parameter清除(不绑定vhost)

命令格式:rabbitmqctl clear_global_parameter {name}

类似于 clear_parameter,但是此 key-value 对并不绑定于vhost。

[root@node1 rabbitmq]# rabbitmqctl clear_global_parameter mqtt_default_vhosts

Clearing global runtime parameter "mqtt_default_vhosts" ...

1.3.2 特殊Parameter命令用法——Policy

除了一些固定的参数 (比如 durable 或者 exclusive),客户端在创建交换器或者队列的时候可以配置一些可选的属性参数来获得一些不同的功能,比如 x-message-ttlx-expires、x-max-length 等。通过客户端设定的这些属性参数一旦设置成功就不能再改变(不能修改也不能添加),除非删除原来的交换器或队列之后再重新创建新的。

Policy是一种特殊的 Parameter 的用法。Policy是vhost级别的。一个 Policy 可以匹配一个或者多个队列(或者交换器,或者两者兼有)。Policy 也可以支持动态地修改一些属性参数,大大地提高了应用的灵活度。一般来说,Policy 用来配置 Federation、镜像、备份交换器、死信等功能。

常见参数:

virtual host:表示当前 Policy 所在的 vhost 是哪个。Name:表示当前 Policy 的名称。Pattern:一个正则表达式,用来匹配相关的队列或者交换器Apply to:用来指定当前 Policy 作用于哪一方。一共有三个选项:

“Exchanges andqueues”:表示作用与 Pattern 所匹配的所有队列和交换器。“Exchanges”:表示作用于与Pattern 所匹配的所有交换器。“Queues”表示作用于与 Pattern 所匹配的所有队列。 Priority:定义优先级。如果有多个 Policy 作用于同一个交换器或者队列,那么Priority 最大的那个 Policy 才会有用。Definition:定义一组或者多组键值对,为匹配的交换器或者队列附加相应的功能。

1.3.2.1 设置Policy

命令格式:rabbitmqctl set_policy [-p vhost] [- -priority priority] [- -apply-to apply-to] {name} {pattern} {definition}

{name}: 表示策略名称;{pattern}:表示当匹配到给定资源的正则表达式,使的该策略得以应用;{definition}:表示策略的定义,作为一个JSON项,在多数shell中,你很可能需要去应用它{priority}:表示策略的优先级的整数,数据越大表示优先级越高,默认值为0{apply_to}:表示策略应该应用的类型: queues/exchange/all,默认值是 all

#设置默认的 vhost 中所有以“^qingjun.”开头的交换器为联邦交换器。

[root@node1 rabbitmq]# rabbitmqctl set_policy --apply-to exchanges --priority 1 p1 "^qingjun." '{"federation-upstream":"f1"}'

Setting policy "p1" for pattern "^qingjun." to "{"federation-upstream":"f1"}" with priority "1" for vhost "/" ...

1.3.2.2 查看Policy

命令格式:rabbitmqctl list_policies [-p vhost]

#列出默认vhost中所有的Policy。

[root@node1 rabbitmq]# rabbitmqctl list_policies

Listing policies for vhost "/" ...

vhost name pattern apply-to definition priority

/ heihei ^qingjun.* all {"ha-mode":"all","ha-sync-mode":"automatic"} 0

/ ha ^ all {"ha-mode":"all","ha-sync-mode":"automatic"} 0

/ p1 ^qingjun. exchanges {"federation-upstream":"f1"} 1

1.3.2.3 清除Policy

命令格式:rabbitmqctl clear_policy [-p vhost] {name}

[root@node1 rabbitmq]# rabbitmqctl clear_policy p1

Clearing policy "p1" on vhost "/" ...

如果两个或多个 Policy 都作用到同一个交换器或者队列上,且这些 Policy 的优先级都是样的,则参数项最多的 Policy 具有决定权。如果参数一样多,则最后添加的 Policy 具有决定权。关于Policy咱们先简单认识一下,后面讲镜像队列时再深度讨论。

二、Federation插件

Rabbitmq可以通过3种方式实现分布式部署:集群、Federation和Shovel。这三种方式并不互斥,可以根据需要选择其中一种或以几种方式的组合来达到分布式部署的目的。Federation和Shovel可以为Rbbitmq的分布式部署提供更高的灵活性,但部署起来也变得异常复杂。下面我们就一起来了解了解其中之一的Federation插件。

2.1 Federation的使用场景

Federation 插件的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群。

适用场景:

Federation 插件能够在不同管理域(可能设置了不同的用户和 host,也可能运行在不同版本的 RabbitMQ 和 Erlang 上) 中的 Broker 或者集群之间传递消息。Federation 插件基于 AMQP O-9-1 协议在不同的 Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况。一个 Broker 节点中可以同时存在联邦交换器(或队列) 或者本地交换器(或队列),只需要对特定的交换器(或队列) 创建 Federation 连接 (Federation link)。Federation不需要在多个 Broker 节点之间创建 O(N^2)个连接(尽管这是最简单的使用方式),这也就意味着 Federation 在使用时更容易扩展。Federation 插件可以让多个交换器或者多个队列进行联邦。对于默认的交换器(每个 vhost 下都会默认创建一个名为“”的交换器)和内部交换器而言,不能对其使用 Federation 的功能。

2.2 联邦交换器

如下图,联邦交换器能够将原本发送给上游交换器 (upstream exchange)的消息路由到本地的某个队列中。

2.2.1 原理机制

角色背景:

broker_2为上游rabbitmq broker,broker_1为本地服务,需要把brker_2中的消息路由到broker_1中的某个队列中。

工作流程:

Federation插件部署在本地broker_1中,该机器上有个交换器exchange_A,通过绑定键rkA与队列queue_A进行绑定。Federation插件部署之后,会把broker_1中的交换器exchange_A与broker_2之间建立一条单向的Federation link。经过Federation link转发的消息会带有一种特殊的headers属性标志。Federation插件会在broker_2上建立一个同名的交换器exchange_A,同时建立一个内部交换器“exchange_A —> broker_1 B”,并通过路由键rkA将这两个交换器绑定。

同名交换器“exchange_A”名称可以自己修改,默认同名。内部交换器 “exchange_A —> broker_1 B”名字中的“broker_1”是集群名称,也可以修改。 Federation插件在broker_2上建立一个队列“federation:exchange_A —> broker_1 B”,并与内部交换器“exchange_A —> broker_1 B”进行绑定。Federation插件在队列“federation:exchange_A —> broker_1 B”与broker_1上的交换器exchangde_A之间建立一条AMQP连接,通过这条AMQP连接实时消费队列“federation:exchange_A —> broker_1 B”中的数据。broker_1中的交换器exchange_A拿到消息数据会通过绑定键rkA,把消息传递给路由键queue_A,从而达到本地路由键消费异地交换器的数据。

2.2.2 异地均摊消费

从上面的结构图中有一个点不难看出,就是如果此时还有另外一个客户端client_2通来消费队列“federation:exchange_A —> broker_1 B”的消息,那么生产者发往 broker_2中 exchange_A 的消息会有部分(一半) 被client_2消费掉,而另一半会发往 broker_1的 exchange_A。倘若业务应用有要求,生产者所有发往 broker_2中 exchange_A 的消息都要转发至 broker_1的exchange_A 中,那么此时就要注意队列“federation:exchange_A —> broker_1 B”就不能被其他消费者消费。

对于“异地均摊消费”这种特殊需求,我们可以把联邦交换器变成另外一个交换器的上游交换器。如下图,两方的交换器可以互为federated exchange和upstream exchange。

两方交换器互为federated exchange和upstream exchange。其中参数“max_hops=1”表示一条消息最多被转发的次数为1。

对于联邦交换器来说,还有非常多的逻辑部署方式。比如三个上游交换器互为联邦交换器,成“三足鼎立”形式,还有环形,树形等等,这里就不一一举例了。

2.3 联邦队列

除了联邦交换器,RabbitMQ 还可以支持联邦队列(federated queue)。联邦队列可以在多个Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列 (upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

2.3.1 原理机制

工作流程:

broker_1为上游队列,broker_2为本地队列。并将broker_2中的队列queue_1、queue_2设为联邦队列。

队列 queuel和queue2 原本在 broker2中,。 Federation 插件会在 broker_1上创建同名的队列 queue_1和queue_2,并与broker_2中的队列 queue_1和queue_2分别建立两条单向独立的 Federation link。当有消费者连接 broker_2,并通过Basic.Consume 消费队列queue_1/ queue_2中的消息时,消费的消息分两种情况:

如果队列 queue_1/queue_2中本身有若干消息堆积,那么消费者会直接消费这些消息,此时 broker_2 中的queue_1 /或 queue_2不会拉取 broker_1中的 queue_1/queue_2的消息。如果队列 queue_1/queue_2中没有消息堆积或者消息被消费完了,那么它会通过 Federation link 拉取在broker_1中的上游队列 queue_1/queue_2中的消息(如果有消息),然后存储到本地,之后再被消费者进行消费。

优点:

消费者既可以消费broker_2中的队列,又可以消费broker_1中的队列,Federation的这种分布式队列的部署可以提升单个队列的容量。如果在上游的broker_1一端部署的消费者来不及消费队列queue_1中的消息,那么 broker_2一端部署的消费者可以为其分担消费,也可以达到某种意义上的负载均衡。

2.3.2 特性一(联邦队列间无限次转发)

特性:

一条消息可以在联邦队列间转发无限次。

原因:

队列中的消息除了被消费,还会转向有多余消费能力的一方,如果这种“多余的消费能力在 broker_1和 broker_2中来回切换,那么消费也会在 broker_1和broker_2中的队列queue中来回转发。

示例:

可以在其中一个队列上发送一条消息“qingjun”,然后再分别创建两个消费者Client_1和Client_2分别连接broker_1和 broker_2,并消费队列 queue 中的消息,但是并不需要确认消息 (消费完消息不需要调用 Basic.Ack)。来回开启/关闭 Client_1和 Client_2可以发现消息“qingjun”会在 broker_1和 broker_2之间串来串去。

2.3.3 特性二(存有阻塞)

特性:

本地队列不能直接拉取上游队列消息。

原因:

如下图,当broker_2的队列queue_2没有消息堆积或者消息被消费完之后并不能通过Basic.Get 来获取 broker_1中队列 queue_1的消息。因为 Basic.Get 是一个异步的方法,如果要从broker_1中队列 queue_1拉取消息,必须要阻塞等待通过 Federation link 拉取消息存入 broker_2中的队列 queue_2之后再消费消息,所以对于 federated queue 而言只能使用 Basic.Consume 进行消费。

2.3.4 特性三(不具备传递性)

特性:

federated queue 并不具备传递性。

原因:

如下图,队列queue_2 作为 federated queue 与队列 queue_1进行联邦,同时队列queue_2又作为队列queue_3的upstream queue,但是这样队列queue_1与queue_3之间并没有产生任何联邦的关系。如果队列queue_1中有消息堆积,消费者连接 broker_3消费queue_3中的消息,无论 queue_3 处于何种状态,这些消费者都消费不到 queue_1中的消息,除非 queue_2 有消费者。

2.4 Federation的使用

使用Federation插件流程:

在需要进行队列通信的rabbitmq服务器上开启Federation插件,如果是集群则需要全部开启。比如broker_1为上游rabbitmq服务器,broker_2为本地,现需要获取broker_1节点上的某交换器上所有的消息或者某交换器绑定的某一队列里的消息,那么就需要在broker_1和broker_2上均开启Federation插件。确定broker_1里的交换器和队列与broker_2的交换器和队列是否相同,而且必须是在同一个虚拟主机里。在broker_2上配置一个或多个Federation Upstreams,每个upstream 均定义了到broker_1节点的 Federation link。在broker_2上定义匹配交换器或者队列的一种/多种策略(Policy)。最后再做数据测试。

2.4.1 查看两边需要通信的交换器/队列

192.168.130.128节点上创建三个交换机和三个队列,并分别一一绑定:

qingjun_1_exchange与qingjun_1_queue绑定;qingjun_2_exchange与qingjun_2_queue绑定;qingjun_3_exchange与qingjun_3_queue绑定。 此时三个队列中都没有挤压数据,都为0。

192.168.130.129节点上创建三个交换机和三个队列,并分别一一绑定:

qingjun_1_exchange与qingjun_1_queue绑定;qingjun_2_exchange与qingjun_2_queue绑定;baimu_1_exchange与baimu_1_queue绑定。 此时三个队列中都没有挤压数据,都为0。

2.4.2 开启Federation插件

分别在在broker_1和broker_2所在的服务器上开启管理插件。

我这里列举的node1节点为上游服务器,node_2节点为本地Federation节点。

开启Federation插件。

[root@node1 ~]# rabbitmq-plugins enable rabbitmq_federation

开启Federation管理插件。

[root@node1 ~]# rabbitmq-plugins enable rabbitmq_federation_management

开启 rabbitmg_federation_management 插件之后,在 RabbitMQ 的管理界面中“Admin”的右侧会多出“Federation Status”和“Federation Upstreams”两个 Tab 页。

192.168.130.128节点Web显示页面,且在qingjun虚拟主机里有个最高权限的用户qingjun,密码为123456。

192.168.130.129节点Web显示页面,且在qingjun虚拟主机里有个最高权限的用户qingjun。

2.4.3 定义Federation upstream

有关 Federation upstream 的信息全部都保存在 RabbitMQ 的 Mnesia 数据库中,包括用户信息、权限信息、队列信息等。在 Federation 中存在 3 种级别的配置:

Upstreams:每个 upstream 用于定义与其他 Broker 建立连接的信息。Upstream sets: 每个 upstream set 用于对一系列使用 Federation 功能的 upstream 进行分组。Policies: 每一个 Policy 会选定出一组交换器,或者队列,亦或者两者皆有而进行限定,进而作用于一个单独的 upsteam 或者 upstream set 之上。

通用参数:

Name:定义这个upstream 的名称。(必填)URI:定义 upstream 的 AMQP 连接。(必填)Prefetch count:定义 Federation 内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。Reconnect delay:Federation link 由于某和原因断开之后需要等待多少秒开始重新建立连接。Acknowledgement Mode:定义 Federation link 的消息确认方式。共有3种: On-confirm(默认)、on-publish、no-ack。

On-confirm:表示在接收到下游的确认消息 (等待下游的 Basic.Ack) 之后再向上游发送消息确认,这个选项可以确保网络失败或者 Broker 宕机时不会丢失消息,但也是处理速度最慢的选项。On-publish:表示消息发送到下游后(并需要等待下游的 Basic.Ack)再向上游发送消息确认,这个选项可以确保在网络失败的情况下不会丢失消息,但不能确保 Broker 宕机时不会丢失消息。no-ack:表示无须进行消息确认,这个选项处理速度最快,但也最容易丢失消息。 Trust User-ID:设定 Federation 是否使用“Validated User-ID”这个功能。该选项功能是指发送消息时验证消息的 user id 的属性

设置为 false 或者没有设置,那么 Federation 会忽略消息的 user id 这个属性。设置为 true,那么Federation只会转发 user_id 为上游任意有效的用户的消息。示例解释:如果在连接 Broker 时所用的用户名为“root”,当发送“test user id”这条消息时设置的 user id的属性为“guest”,那么这条消息会发送失败,只有当 user id 设置为“root”时这条消息才会发送成功。

联邦交换器参数:

Exchange:指定upstream exchange 的名称,默认情况下和 federated exchange 同名。Max hops:指定消息被丢弃前在 Federation link 中最大的跳转次数。默认为1。注意即使设置max-hops 参数为大于1的值,同一条消息也不会在同一个 Broker中出现 2 次,但是有可能会在多个节点中被复制。Expires:指定Federation link 断开之后,federated queue所对应的 upstream queue的超时时间,默认为“none”表示为不删除,单位为 ms。这个参数相当于设置普通队列的 -expires 参数。设置这个值可以避免 Federation link 断开之后,生产者一直在向 brokerl 中的 exchangeA 发送消息,这些消息又不能被转发到 broker3 中而被消费掉,进而造成 brokerl 中有大量的消息堆积。Message TTI:为 federated queue 所对应的 upstream queue 设置,相当于普通队列的x-message-ttl 参数。默认为“none”,表示消息没有超时时间。HA policy:为 federated queue 所对应的 upstream queue设置,相当于普通队列的 x-ha-policy参数,默认为“none”,表示队列没有任何 HA。

联邦队列参数:

Queue:执行 upstream queue 的名称,默认情况下和 federated queue 同名。

在broker_2节点上定义第一个Federation Upstreams,名为qingjun_federation_1。

生成Upstreams成功。 URI填写示例:

2.4.4 定义Policy

在broker_2节点上定义第一个Policy,用于匹配交换器 exchangeA。 通用参数:

virtual host:表示当前 Policy 所在的 vhost 是哪个。Name:表示当前 Policy 的名称。Pattern:正则表达式,用来匹配相关的队列或者交换器Apply to:用来指定当前 Policy 作用于哪一方。一共有三个选项:

“Exchanges and queues”:表示作用与 Pattern 所匹配的所有队列和交换器。“Exchanges”:表示作用于与Pattern 所匹配的所有交换器。“Queues”表示作用于与 Pattern 所匹配的所有队列。 Priority:定义优先级。如果有多个 Policy 作用于同一个交换器或者队列,那么Priority 最大的那个 Policy 才会有用。Definition:定义一组或者多组键值对,为匹配的交换器或者队列附加相应的功能。

2.4.5 查看结果

这里只连接了192.168.130.128和192.168.130.129上相同的交换器和队列:

qingjun_1_exchange;qingjun_1_queue;qingjun_2_exchange;qingjun_2_queue 此时在192…168.130.128上还存在qingjun_3_exchange和qingjun_3_queue,在192.168.130.129上还存在baimu_1_exchange和baimu_1_queue。如果现在在128上创建baimu_1_exchange和baimu_1_queue,则需要更新Federation status。

2.4.6 测试效果

2.4.6.1 上游信息同步给本地

在上游broker_1的第一个交换器上发送一条消息“wuhan_1”。

2. broker_1上第一个交换器绑定的队列有这条消息。 3. 查看本地broker_2上与之连接的第一个交换机绑定的队列也获取到这条消息“wuhan_1”。

2.4.6.2 本地新建policy并在上游生成交换器和队列

在本地broker_2上创建第二个policy,baimu_policy_1。

2. 查看broker_2已经建立了baimu的连接。

3. 查看上游broker_1上已经建立了一个与本地broker_2一样的交换器和内部队列。

2.6.4.2.1 数据单向传递

在broker_2上的baimu_exchange_1发送一条消息“beijing_1”。

上游broker_1并没有受到这条"beijing_1"的消息,说明建立的federation link 是单向的,数据只能从上游传给本地。

2.4.6.2.2 上游交换器里的数据传递给本地

在上游broker_1生成的交换器里创建插入一条消息"beijing_2"

本地broker_2会收到此消息。

3. 但是在上游broker_1生成的交换器绑定的队列里发送一条消息“beijing_3”就只存在上有服务器里了,不会传递给本地broker_2。因为给上游broker_1生成的交换器和队列不绑定。

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: