目录

Kafka安全1 安全协议1.1 PALINTEXT1.2 SSL1.2.1 生成服务端证书1.2.2 生成客户端证书1.2.3 修改配置listenersadvertised.listenerslistener.security.protocol.mapinter.broker.listener.namesecurity.inter.broker.protocolcontrol.plane.listener.name

1.3 SASL_PLAINTEXT1.4 SASL_SSL

2 身份认证2.1 SSL身份认证2.1.1 SSL的性能2.1.2 配置客户端认证服务端2.1.3 配置服务端认证客户端ssl.client.auth=requiredssl.client.auth=requested

2.1.4 更新秘钥2.1.7 安全方面的考虑使用TLS高版本使用256位密钥密钥的安全保存缩短密钥寿命限制客户端配额延迟发送客户端响应

2.2 SASL身份验证2.2.1 JAAS2.2.2 SASL/GSSAPI2.2.2.1 设置SASL/GSSAPI2.2.2.2 安全方面的考虑

2.2.3 SASL/PLAIN2.2.3.1 配置SASL/PLAIN2.2.3.2 安全方面的考虑

2.2.4 SASL/SCRAM2.2.4.1 设置SASL/SCRAM2.2.4.2 安全方面的考虑

2.2.5 SASL/OAUTHBEARER2.2.5.1 配置SASL/OAUTHBEARER2.2.5.2 安全方面的考虑

2.2.6 委托令牌2.2.6.1 配置委托令牌2.2.6.2 安全方面的考虑

2.7 重新认证connections.max.reauth.ms=0

2.8 安全更新不停机

3 加密3.1 传输层加密3.2 数据存储加密3.3 端到端加密3.3.1 端到端加密流程3.3.2 密钥管理3.3.3 压缩3.3.4 消息键加密

4 授权4.1 自定义授权4.2 安全方面的考虑

5 审计6 配额7 Zookeeper安全7.1 SASL7.2 SSL7.3 授权

8 保护平台8.1 保护密码

附录1 JAAS1.1 简介1.2 核心类及接口1.2.1 认证SubjectPrincipalsCredentialsLoginContextLoginModuleCallbackHandlerCallback

1.2.2 授权PolicyAuthPermissionPrivateCredentialPermission

1.3 配置文件1.3.1 JAAS Login Configuration File1.3.2 Policy File

1.3 示例代码

2 PGP2.1 什么是 GPG

Kafka安全

安全性需要从系统整体层面考虑,Kafka提供了如下安全特性,可以与现有安全基础设施集成,构建出基于整个系统的安全。 包括:

身份认证 包括客户端认证服务端身份,服务端认证客户端身份认证授权 客户端写入主题权限,客户端读取主题权限,客户端访问消费组的权限加密 消息传输通道加密,磁盘加密,消息摘要防篡改审计 客户端执行的所哟操作都要被审计跟踪配额 防止客户端占用所有可用带宽,拒绝服务攻击

1 安全协议

Kafka使用两种标准技术(TLS,SASL)支持4种安全协议。 TLS(Transport Layer Security):传输层安全,SSL(Secure Socket Layer)安全套接字层作为其前身,支持传输加密和身份验证。 SASL(Simple Authentication Security Layer):简单身份验证和安全层,是一个在面向连接的协议中使用不同的机制实现按身份验证的框架。

Kafka共提供有4种可选安全协议,每一个安全协议都结合了TLS或SASL。

1.1 PALINTEXT

无传输层加密,无身份验证,只适合在私有网络传输非敏感数据。

1.2 SSL

支持传输层加密,SSL客户端及服务端身份验证,适用于不安全网络传输。 SSL协议是基于IKP(Public Key Infrastructure公钥基础设施)的协议,所以首先要生成证书。这里基于双向认证,介绍生成自签名证书的过程。

1.2.1 生成服务端证书

首先生成服务端CA密钥对,用来签发服务端证书。证书颁发机构(CA, Certificate Authority)即颁发数字证书的机构。是负责发放和管理数字证书的权威机构,并作为电子商务交易中受信任的第三方,承担公钥体系中公钥的合法性检验的责任。在这里,我们充当CA,给Kafka服务器和客户端签发证书。

keytool -genkeypair -keyalg RSA -keysize 2048 -keystore server.ca.p12 -storetype PKCS12 -storepass 123456 -keypass 123456 -alias ca -dname "CN=BrokerCA" -ext bc=ca:true -validity 36500

导出CA根证书,用来生成证书链,并安装进客户端的信任证书库:

keytool -export -file server.ca.crt -keystore server.ca.p12 -storetype PKCS12 -storepass 123456 -alias ca -rfc

接下来生成服务端密钥对:

keytool -genkey -keyalg RSA -keysize 2048 -keystore server.ks.p12 -storepass 123456 -keypass 123456 -alias server -storetype PKCS12 -dname "CN=Kafka,O=QuPeng,C=China" -validity 36500

导出证书请求文件,用于向CA申请证书:

keytool -certreq -file server.csr -keystore server.ks.p12 -storetype PKCS12 -storepass 123456 -keypass 123456 -alias server

CA用自己的密钥对签发一个服务端的证书:

keytool -gencert -infile server.csr -outfile server.crt -keystore server.ca.p12 -storetype PKCS12 -storepass 123456 -alias ca -ext SAN=dns:localhost,ip:127.0.0.1,ip:172.26.143.96 -validity 36500

将根证书和服务端证书都导入服务端密钥库:

cat server.crt server.ca.crt > serverchain.crt

keytool -importcert -file serverchain.crt -keystore server.ks.p12 -storepass 123456 -keypass 123456 -a

lias server -storetype PKCS12 -noprompt

如果集群的broker之间需要认证,将根证书导入信任密钥库:

keytool -import -file server.ca.crt -keystore server.ts.p12 -storetype PKCS12 -storepass 123456 -alias server -noprompt

将根证书导入客户端信任密钥库:

keytool -import -file server.ca.crt -keystore client.ts.p12 -storetype PKCS12 -storepass 123456 -alias ca -noprompt

1.2.2 生成客户端证书

过程与服务端证书类似。 生成客户端CA证书:

keytool -genkeypair -keyalg RSA -keysize 2048 -keystore client.ca.p12 -storetype PKCS12 -storepass 123456 -keypass 123456 -alias ca -dname CN=ClientCA -ext bc=ca:true -validity 36500

keytool -export -file client.ca.crt -keystore client.ca.p12 -storetype PKCS12 -storepass 123456 -alias ca -rfc

导出客户端证书请求:

keytool -genkey -keyalg RSA -keysize 2048 -keystore client.ks.p12 -storepass 123456 -keypass 123456 -alias client -storetype PKCS12 -dname "CN=Kafka Client App,O=QuPeng,C=China" -validity 36500

keytool -certreq -file client.csr -keystore client.ks.p12 -storetype PKCS12 -storepass 123456 -keypass 123456 -alias client

签发客户端证书:

keytool -gencert -infile client.csr -outfile client.crt -keystore client.ca.p12 -storetype PKCS12 -sto

repass 123456 -alias ca -validity 36500

将客户端根证书和客户端证书都导入密钥库:

cat client.crt client.ca.crt > clientchain.crt

keytool -importcert -file clientchain.crt -keystore client.ks.p12 -storepass 123456 -keypass 123456 -alias client -storetype PKCS12 -noprompt

将服务端证书导入信任证书库:

keytool -import -file client.ca.crt -keystore server.ts.p12 -alias client -storetype PKCS12 -storepass 123456 -noprompt

1.2.3 修改配置

服务端server.properties:

listeners=PLAINTEXT://172.26.143.96:9092,SSL://172.26.143.96:9093

advertised.listeners=PLAINTEXT://172.26.143.96:9092,SSL://172.26.143.96:9093

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL

#SSL

ssl.keystore.location=/home/peter/keystore/server.ks.p12

ssl.keystore.password=123456

ssl.key.password=123456

ssl.keystore.type=PKCS12

ssl.truststore.location=/home/peter/keystore/server.ts.p12

ssl.truststore.password=123456

ssl.truststore.type=PKCS12

ssl.client.auth=required

inter.broker.listener.name=PLAINTEXT

#security.inter.broker.protocol=PLAINTEXT

下面详细介绍一下这些属性:

listeners

listener属性是broker用来监听网络请求的监听器的列表。它的格式是:

listener_name_1://host_name_1:port_1,listener_name_2://host_name_2:port_2

listener_name_1:监听器名称,任意命名。并非为示例中的安全协议名称,那只是为了简便。listener.security.protocol.map属性定义了监听器和安全协议的映射。 host_name_1:IP地址。可以为空,为默认网卡的IP;可以为0.0.0.0,监听所有网卡。 port:监听的端口号。

advertised.listeners

按照字面意思理解,发布的监听器。通过Zookeeper发布,也就是告诉给其它broker,客户端程序我的监听器配置是什么,请通过它来与我通信。 它的格式是:

listener_name_1://host_name_1:port_1,listener_name_2://host_name_2:port_2

可以不设置此属性,默认使用listeners的配置listener_name不支持0.0.0.0的形式。如果listeners属性配置为0.0.0.0,那么就必须配置此属性。

例如在Zookeeper中查询到:

{

"features": {},

"listener_security_protocol_map": {

"PLAINTEXT": "PLAINTEXT",

"SSL": "SSL"

},

"endpoints": ["PLAINTEXT://172.26.143.96:9092,SSL://172.26.143.96:9093"],

"jmx_port": -1,

"port": 9092,

"host": "localhost",

"version": 5,

"timestamp": "1647337490945"

}

listener.security.protocol.map

监听器名称和安全协议之间的映射关系集合。 它的格式是:

listener_name_1:security_protocol_name_1,listener_name_2:security_protocol_name_2

security_protocol_name_1:4中协议中的一种:PLANTEXT,SSL,SASL_PLANTEXT,SASL_SSL。

inter.broker.listener.name

用于Broker之间通信的listener的名称。如果未设置,则listener名称由 security.inter.broker.protocol定义(security.inter.broker.protocol默认值是PLAINTEXT)。 同时设置 这个和 security.inter.broker.protocol 属性是错误的。

security.inter.broker.protocol

用于在代理之间进行通信的安全协议。 有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。

control.plane.listener.name

用于Controller和Broker之间通信的监听器名称, Broker将会使用control.plane.listener.name 来定位监听器列表中的EndPoint

如果未设置,则默认使用inter.broker.listener.name来通信,没有专门的链接。

Spring Boot客户端配置:

security.protocol: SSL

ssl.keystore.location: classpath:keystore/client.ks.p12

ssl.keystore.password: 123456

ssl.truststore.location: classpath:keystore/client.ts.p12

ssl.truststore.password: 123456

ssl.key.password: 123456

ssl.keystore.type: PKCS12

1.3 SASL_PLAINTEXT

无传输层加密,支持SASL客户端身份验证(一些SAAL机制也支持服务端身份验证),适用于私有网络。

1.4 SASL_SSL

支持传输层加密,带有SASL身份验证,适用于不安全网络,支持SASL客户端和SSL服务器端身份验证。 服务端server.properties:

listeners=PLAINTEXT://172.26.143.96:9092,SSL://172.26.143.96:9093,SASL_SSL://172.26.143.96:9094

advertised.listeners=PLAINTEXT://172.26.143.96:9092,SSL://172.26.143.96:9093,SASL_SSL://172.26.143.96:9094

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

#SSL

ssl.keystore.location=/home/peter/keystore/server.ks.p12

ssl.keystore.password=123456

ssl.key.password=123456

ssl.keystore.type=PKCS12

ssl.truststore.location=/home/peter/keystore/server.ts.p12

ssl.truststore.password=123456

ssl.truststore.type=PKCS12

ssl.client.auth=required

inter.broker.listener.name=PLAINTEXT

#security.inter.broker.protocol=PLAINTEXT

#SASL

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \

username="kafka" password="654321" user_kafka="654321" user_Peter="123456";

客户端springboot配置:

security.protocol: SSL

ssl.keystore.location: classpath:keystore/client.ks.p12

ssl.keystore.password: 123456

ssl.truststore.location: classpath:keystore/client.ts.p12

ssl.truststore.password: 123456

ssl.key.password: 123456

ssl.keystore.type: PKCS12

sasl:

mechanism: PLAIN

jaas:

config: org.apache.kafka.common.security.plain.PlainLoginModule required username="Peter" password="123456";

2 身份认证

身份验证是通过建立客户端和服务器端身份来验证客户端和服务器端真实性的过程。当客户端连接到首领broker时,可以通过服务器端身份验证来确定自己连接的就是真实的broker。在进行客户端身份验证时,服务器通过验证客户端的凭证(比如密码或数字证书)来确定Alice的身份,确保连接是来自Alice而不是冒充者。一旦通过身份验证,Alice的身份就与连接相关联,并在整个连接生命周期中起作用。

Kafka用KafkaPrincipal实例表示客户端身份,并用它授予资源访问权限,以及为具有这个客户端身份的连接分配配额。每个连接的KafkaPrincipal实例都是在身份验证过程中基于某种身份验证协议创建的。如果使用了基于密码的身份验证,那么user_Peter的主体就是User:user_Peter。

principal.builder.class 可以通过配置broker的principal.builder.class来自定义KafkaPrincipal,此参数的默认值是:org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder。 自定义主题构建类:

package com.qupeng.demo.kafka.kafkaapache.security;

import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;

import org.apache.kafka.common.security.kerberos.KerberosShortNamer;

import org.apache.kafka.common.security.ssl.SslPrincipalMapper;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

public class UserCertSubjectPrincipalBuilder extends DefaultKafkaPrincipalBuilder {

public static final String DEFAULT_MAPPER_RULE = "RULE:([cC][nN]=.*?),O=io.strimzi.*$/$1,O=io.strimzi/,RULE:([cC][nN]=.*?),([a-zA-Z]{1,2})=.*$/$1/,DEFAULT";

public UserCertSubjectPrincipalBuilder() {

super((KerberosShortNamer)null, new SslPrincipalMapper("RULE:([cC][nN]=.*?),O=io.strimzi.*$/$1,O=io.strimzi/,RULE:([cC][nN]=.*?),([a-zA-Z]{1,2})=.*$/$1/,DEFAULT"));

}

public UserCertSubjectPrincipalBuilder(KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) {

super(kerberosShortNamer, new SslPrincipalMapper("RULE:([cC][nN]=.*?),O=io.strimzi.*$/$1,O=io.strimzi/,RULE:([cC][nN]=.*?),([a-zA-Z]{1,2})=.*$/$1/,DEFAULT"));

}

}

匿名链接 User:ANONYMOUS 这个主体被用于未经身份验证的连接,包括PLAINTEXT监 听器接受的客户端连接和SSL监听器接受的未经身份验证的客户端连接。

2.1 SSL身份认证

如果监听器的安全协议配置的是SSL或SASL_SSL,那么监听器连接的安全传输层就会使用TLS。建立TLS连接需要一个握手过程,在这个过程中会执行身份验证、协商加密参数,并生成用于加密的共享密钥。客户端通过验证服务器的数字证书来确定服务器的身份。如果启用了SSL客户端身份验证,则服务器也会通过验证客户端数字证书来确定客户端的身份。所有的SSL流量都是加密的,适合用在不安全的网络中。

2.1.1 SSL的性能

因为SSL通道是加密的,所以会增加CPU方面的开销。SSL目前不支持零 复制传输。根据流量模式的不同,增加的开销可能会高达20%~30%。

2.1.2 配置客户端认证服务端

服务端需要配置密钥库,其中包含broker的私有密匙和证书。同时,客户端也需要配置一个信任存储,其中包含broker证书或签署了broker 证书的证书颁发机构(CA)的证书。 broker证书需要包含作为主体别名(subject alternative name,SAN)扩展或公用名称(common name,CN)的broker主机名,客户端可以用它验证服务器的主机名。也可以使用通配符证书,为同一域名下的所有broker使用相同的密钥存储,以此来简化配置。

服务器主机名验证 在默认情况下,Kafka客户端会验证保存在服务器证书中的服务器主机名与 客户端连接的主机名是否匹配。连接用的主机名既可以是在客户端配置的引 导服务器地址,也可以是broker通过元数据响应返回给客户端的监听器主 机名。主机名验证是服务器端身份验证的一个关键部分,可以防止中间人攻 击,因此在生产环境中不应该被禁用。

2.1.3 配置服务端认证客户端

ssl.client.auth=required

可以在broker 端配置 ssl.client.auth=required,让broker对SSL监听器接受的客户端 连接进行身份验证。客户端需要配置一个密钥存储;broker端需要配置一个信任存储,其中包含客户端证书或客户端证书的CA。如果broker间通信也启用了SSL,那么broker端的信任存储需要包含broker证书的CA和客户端证书的CA。在默认情况下,客户端证书中的可识别名称(distinguished name,DN)会被作为KafkaPrincipal,用于授权和配额。可以通过配置 ssl.principal.mapping.rules来自定义主体。配置了SASL_SSL的监听器将禁用TLS客户端身份验证,其安全性主要依赖SASL身份验证和由SASL创建的KafkaPrincipal.

ssl.client.auth=requested

如果配置了 ssl.client.auth=requested,那么SSL客户端身份验证就会变成 可选的。在这种情况下,没有配置密钥存储的客户端可以完成TLS握手,但 其主体是User:ANONYMOUS。

忽略信任存储 如果使用了由知名的受信任权威机构签署的证书,就可以省略broker和客户 端的信任存储配置。在这种情况下,将使用Java提供的默认信任存储来建立 信任。第2章介绍过相关的安装步骤。

2.1.4 更新秘钥

必须在证书过期前更新密钥和信任存储,避免TLS握手失败。可以通过直接修改存储文件或将配置参数指向新的存储文件来更新broker的SSL存储。在这两种情况下,都可以使用Admin API或Kafka配置工具来更新存储。下面的例子使用了Kafka配置工具来更新broker 0外部监听器的密钥存储。

bin/kafka-configs.sh -- bootstrap-server localhost:9092

-- command-config admin.props

-- entity-type brokers -- entity-name 0 -- alter -- add-config \

'listener.name.external.ssl.keystore.location=/path/to/server.ks.p12'

2.1.7 安全方面的考虑

使用TLS高版本

TLS广泛用于为多种协议(包括HTTPS)提供传输层安全性。Kafka默认只启用了较新的TLSv1.2和TLSv1.3,因为旧协议(如TLSv1.1)存在已知的漏洞。由于不安全重协商存在已知漏洞,因此Kafka不支持TLS连接重协商。

使用256位密钥

在不安全网络中传输数据时,可以使用至少256位的加密密钥来防止加密攻击,并确保数据的完整性。

密钥的安全保存

在默认情况下,包含私钥的密钥存储直接保存在文件系统中,所以非常有必要对文件系统的访问权限进行限制。如果私钥被泄露,那么可以使用标准的Java TLS特性来吊销证书。

缩短密钥寿命

还可以使用寿命较短的密钥来降低泄露的概率。

限制客户端配额

TLS握手过程开销巨大,并占用了broker大量的网络线程时间。对于不安全网络中的TLS监听器,要用连接配额来保护它们不受拒绝服务攻击,从而保证broker的可用性。

延迟发送客户端响应

broker的配置参数 connection.failed.authentication.delay.ms 可以用来延迟发送身份验证失败响应,以降低客户端对身份验证失败的重试速率。

2.2 SASL身份验证

Kafka协议支持使用SASL进行身份验证,并内置支持几种常用的SASL机制。SASL可以和合TLS作为传输层,提供一个有身份验证和加密的安全通道。SASL身份验证通过一系列服务器质询(challenge)和客户端响应来实现,其中SASL机制正义了质询和响应的序列和连接格式。broker支持以下几种SASL机制,并通过回调机制集成现有的安全基础设施。

GSSAPI SASL/GSSAPI 支持Kerberos 身份验证,并可以与Active Directory或OpenLDAP等 Kerberos 服务器集成。 PLAIN 用户名和密码身份验证通常与自定义服务器回调一起使用,用于验证保存在外部密码存储系统中的密码。 SCRAM-SHA-256 和SCRAM-SHA-512 Kafka提供的用户名和密码身份验证,不需要额外的密码存储。 OAUTHBEARER 使用OAuth承载令牌进行身份验证,通过自定义回调来获取和验证标准OAuth服务器 授予的令牌。

可以通过 sasl.enabled.mechanisms参数为启用了SASL的broker监听器配置一个或多个SASL机制。客户端可以通过配置sasl.mechanism参数来选择任何一种已被启用的机制。

2.2.1 JAAS

Kafka 使用Java认证和授权服务(JAAS)来配置SASL。配置参数 sasl.jaas.config包含了一个JAAS配置条目,其中指定了登录模块及相关参数。我们在配置 sasl.jaas.config时会将listener和mechanism作为前缀。

例如,listener.name.external.gssapi.sasl.jaas.config 表示为监听器EXTERNAL的SASL/GSSAPI机制配置JAAS条目。broker和客户端的登录过程将使用JAAS配置来确定用于身份验证的公共和私有凭证。

JAAS 配置文件 JAAS也可以配置在文件中,并通过Java的系统属性 java.security.auth.login.config来指定配置文件路径。不过,还是推荐使用Kafka的 sasl.jaas.config配置参数,因为它支持密码保护,而且如果监听器启用了多种机制,则可以对每种SASL机制进行单独的配置。

Kafka支持的SASL机制可以通过回调机制与第三方认证服务器集成。我们可以为broker或客户端提供一个登录回调,以此来自定义登录过程,例如,获取用于身份验证的凭证。

也可以提供一个服务器端回调,用它来验证客户端凭证,例如,通过一台外部密码服务器来验证密码。还可以提供一个客户端回调,将客户端凭证注入而不是包含在JAAS配置中。

下面将更详细地探讨Kafka支持的SASL机制。

2.2.2 SASL/GSSAPI

Kerberos 是一种被广泛使用的网络身份验证协议,它使用了强加密技术,支持在不安全网络中进行安全的双向身份验证。通用安全服务应用程序编程接口(GSS-API)是一个为使用了不同身份验证机制的应用程序提供安全服务的框架。RFC-4752将GSS-API的Kerberos V5身份验证机制引入到了SASL中。Kerberos服务器的开源和企业级实现让Vorberos 成为很多对女全性有严格要求的行业的选择。Kafka 通过SASL/GSSAPI支持Kerberos 身份验证。

2.2.2.1 设置SASL/GSSAPI

Kafka 使用包含在Java运行时环境中的GSSAPI提供程序来支持Kerberos 身份验证。

sasl.enabled.mechanisms=GSSAPI

listener.name.external.gssapi.sasl.jaas.config=|

com.sun.security.auth.module.Krb5LoginModule required \

useKeyTab=true storeKey=true

keyTab="/path/to/broker1.keytab" \

principal="kafka/broker1.example.com@EXAMPLE.COM";

如果broker间通信启用了SASL/GSSAPI,则也需要配置broker间的SASL 机制和Kerberos服务名。

sasl.mechanism.inter.broker.protocol=GSSAPI

sasl.kerberos.service.name=kafka

客户端需要在JAAS配置中指定它们自己的密钥表和主体,并用sasl.kerberos.service.name 指定它们想要连接的服务名。

sasl.mechanism=GSSAPI

sasl.kerberos.service.name=kafka

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \

useKeyTab=true storeKey=true

keyTab="/path/to/alice.keytab" \

principal="Alice@EXAMPLE.COM";

2.2.2.2 安全方面的考虑

如果在生产环境中使用了Kerberos,那么建议便用SASL_SSL来保护身份验证流程和经过身份验证之后的数据流量。如果不使用TLS米提供安全传输层,则取窃听者有可能可以获取到足够的信息来发动字典攻击或暴力破解工具,并窃取客户端任凭证。使用随机生成的密钥比使用基于密码生成的密钥更为安全。应该避免使用较弱的加空异法(如DES-MD5),并通过文件系统权限来限制对密钥表文件的访问,因为能够访问这个文件的用户都可能成为冒充者。

SASL/GSSAPI需要一个安全的DNS服务来进行服务器身份验证。由于针对KDC服务或DNS 服务的拒绝服务攻击有可能会导致客户端身份验证失败,因此有必要对这些服务的可用性进行监控。Kerberos还依赖可配置的松散同步时钟来检测重放攻击,所以要确保时钟同步是安全的。

2.2.3 SASL/PLAIN

RFC-4616定义了一个简单的用户名和密码身份验证机制,可以与TLS一起,为我们提供安全的身份验证。在进行身份验证期间,客户端会向服务器发送用户名和密码,服务器则会根据它的密码存储来验证密码。Kafka内置了SASL/PLAIN支持,我们可以通过一个自定义回调与外部密码数据库集成。

2.2.3.1 配置SASL/PLAIN

SASL/PLAIN的默认实现会将broker的JAAS配置作为密码存储。所有的客户端用户名和密码都包含在登录选项中,broker会验证客户端提供的密码是否与其中的一个条目相匹配。如果broker间通信启用了SASL/PLAIN,那么还需要提供broker的用户名和密码。

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

listener.name.external.plain.sasl.jaas.config=\

org.apache.kafka.common.security.plain.PlainLoginModule required \

username="kafka" password="kafka-password" \

user_kafka="kafka-password" \

user_Alice="Alice-password";

客户端也需要配置用户名和密码。

sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \

required username="Alice" password="Alice-password";

在JAAS配置文件中保存密码既不安全也不灵活,因为在添加或删除用户后需要重启所有的broker。

如果是在生产环境中使用SASL/PLAIN,那么可以使用自定义服务器回调将broker与安全的第三方密码服务器集成在一起。也可以使用自定义回调实现密码轮换。务器端的回调需要在轮换密码的重叠时间段内同时支持旧密码和新密码,直到所有客户新都切换到新密码。

下面是一个使用自定义回调的例子,我们对htpasswd生成的密码进行验证。

public class PasswordVerifier extends PlainServerCallbackHandler {

private final List passwdFiles = new ArrayList<>();

public void configure(Map configs, String mechanism,

List jaasEntries) {

Map loginOptions = jaasEntries.get(0).getOptions();

String files = (String) loginOptions.get("password.files");

Collections.addAll(passwdFiles, files.split(","));

}

@Override

protected boolean authenticate(String user, char[] password) {

return passwdFiles.stream()

.anyMatch(file -> authenticate(file, user, password));

}

private boolean authenticate (String file, String user,char[] password){

try {

String cmd = String.format("htpasswd -vb %s %s %s",

file, user, new String(password));

return Runtime.getRuntime().exec(cmd).waitFor() == 0;

} catch (Exception e) {

return false;

}

}

}

broker配置了密码验证回调处理器和其他选项。

listener.name.external.plain.sasl.jaas.config=\

org.apache.kafka.common.security.plain.PlainLoginModule required \

password.files="/path/to/htpassword.props,/path/to/oldhtpassword.props";

listener.name.external.plain.sasl.server.callback.handler.class=\

com.example.PasswordVerifier

在客户端,一个实现了 org.apache.kafka.common.security.auth.AuthenticateCallbackHandler的客户端回调处理器会在建立连接时动态加载密码,而不是在启动时从JAAS配置中加载静态密码。为了提高安全性,可以将密码保存在加密的文件或外部的安全服务器中。下面的例子使用配置类从文件中动态加载密码。

public class PasswordProvider implements AuthenticateCallbackHandler {

@Override

public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {

Properties props = Utils.loadProps("src/main/resources/password.properties"); // 在回调中加载配置文件,确保使用最新的密码来进行密码轮换。

PasswordConfig config = new PasswordConfig(props);

String user = config.getString("username");

String password = config.getPassword("password").value(); // 即使密码被存储在外部,底层的配置库也会返回实际的密码。

for (Callback callback: callbacks) {

if (callback instanceof NameCallback)

((NameCallback) callback).setName(user);

else if (callback instanceof PasswordCallback) {

((PasswordCallback) callback).setPassword(password.toCharArray());

}

}

}

private static class PasswordConfig extends AbstractConfig {

static ConfigDef CONFIG = new ConfigDef()

.define("username", STRING, HIGH, "User name")

.define("password", PASSWORD, HIGH, "User password"); // 将密码定义成PASSWORD类型,确保密码不会被包含在日志中。

PasswordConfig(Properties props) {

super(CONFIG, props, false);

}

}

}

客户端和为broker间通信启用了SASL/PLAIN的broker都可以配置客户端回调。

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \

required file="/path/to/credentials.props";

sasl.client.callback.handler.class=com.qupeng.demo.kafka.kafkaapache.security.PasswordProvider

2.2.3.2 安全方面的考虑

由于SASL/PLAIN会通过网络传输明文密码,因此PLAIN机制应该与SASL_SSL一起使用,以此来提供安全传输层。

2.2.4 SASL/SCRAM

RFC-5802中引入了一种安全的用户名和密码身份验证机制,解决了普通密码身份验证机制(如SASL/PLAIN)的安全问题。加盐质询响应认证机制(salted challenge response authentication mechanism, SCRAM)不传输明文密码,而是以一种无法被客户端冒充的格式保存密码。密云与一些随机数组合在一起(也就是所谓的加盐),并对其应用单向加密哈希函数。Kafka促供了一个内置的SCRAM实现,可以与安全的ZooKeeper部署在一起,不需要额外的密码服务器。Kafka的SCRAM机制支持SCRAM-SHA-256和SCRAM-SHA-512。

2.2.4.1 设置SASL/SCRAM

可以在启动ZooKeeper之后创建初始用户,然后再启动broker。broker会在启动期间将SCRAM用户元数据加载到内存中,确保所有用户(包括用于broker间通信的broker用户)都能成功地进行身份验证。我们可以随时添加或删除用户。

broker会使用基于ZooKeeper监听器的通知机制来更新缓存。下面的这个例子创建了一个用户,主体是User:Alice,密码是Alice-password,使用的机制是SCRAM-SHA-512。

$ bin/kafka-configs.sh -- zookeeper localhost:2181 -- alter -- add-config \

'SCRAM-SHA-512=[iterations=8192,password=Peter-password]'

-- entity-type users -- entity-name Peter

一个监听器可以配置一个或多个SCRAM机制。只有当监听器用于broker间通信时,才需要用到broker的用户名和密码。

sasl.enabled.mechanisms=SCRAM-SHA-512

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

listener.name.external.scram-sha-512.sasl.jaas.config=\

org.apache.kafka.common.security.scram.ScramLoginModule required \

username="kafka" password="kafka-password";

客户端必须配置一个在broker端启用的SASL机制,而且客户端JAAS配置中必须包含用户名和密码。

sasl.mechanism=SCRAM-SHA-512

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule \

required username="Peter" password="Peter-password";

可以用配置管理工具的-- add-config选项添加新SCRAM用户,用-- delete-config选项删除用户。当现有用户被删除时,我们无法为这个用户建立新连接,但这个用户的现有连接可以继续使用。可以为broker配置一个重新认证时间间隔,用于限制用户被删除后可以继续使用现有连接的时间。下面的例子中删除了Alice的SCRAM-SHA-512机制的凭证。

$ bin/kafka-configs.sh -- zookeeper localhost:2181 -- alter -- delete-config |

'SCRAM-SHA-512' -- entity-type users -- entity-name Peter

2.2.4.2 安全方面的考虑

SCRAM在密码中加入了随机数,并对其应用了单向加密哈希函数,以避 免通过网络传输或在数据库中保存真实的密码。然而,任何基于密码的系统的安全性仅取决于密码本身。必须采用强密码策略,保护系统免受暴力或字典攻击。Kafka只支持强哈希算法 SHA-256和SHA-512,不采用较弱的SHA-1,从而提供了安全保障。这与默认的4096迭代次数和密钥随机盐相结合,限制了当ZooKeeper安全遭破坏时给Kafka造成的影响。

我们还是要为握手过程中传输的密钥和保存在ZooKeeper中的密钥提供额外的保护,以历止暴力破解攻击。SCRAM必须与SASL_SSL一起使用,避免窃听者在身份验证过程中获取哈希密钥。ZooKeeper也必须启用SSL,同时ZooKeeper数诺必须进行磁盘加密,确保存储的密钥即使在遭受攻击时也不会被获取到。如果没有与安全的ZooKeeper部署在一起,则可以使用SCRAM回调,并与安全的外部凭证存储系统集成在一起。

2.2.5 SASL/OAUTHBEARER

OAuth是一个用于限制应用程序访问HTTP服务的授权框架。RFC-7628定义了 OAUTHBEARER SASL 机制,可以用通过OAuth 2.0获取的凭证访问受保护的非HTTP协议资源。OAUTHBEARER使用的是寿命较短且资源访问受限的OAuth 2.0不记名令牌,避免了在长期密码机制中存在的安全漏洞。Kafka支持SASL/OAUTHBEARER 客户端身份验证,并可以与第三方OAuth服务器集成。OAUTHBEARER的内置实现使用了不安全的JSON Web Token(JWT),不适合被用在生产环境中。可以通过自定义回调将其与标准的OAuth服务器集成,以便在生产环境中使用OAUTHBEARER机制提供安全的身份验证。

2.2.5.1 配置SASL/OAUTHBEARER

Kafka内置的SASL/OAUTHBEARER实现不验证令牌,因此只需要在JAAS配置中指定登录模块。如果监听器被用于broker间通信,则还需要提供broker在发起客户端连接时需要用到的令牌的相关信息。unsecuredLoginStringClaim_sub指定了默认的主体声明。

sasl.enabled.mechanisms=OAUTHBEARER

sasl.mechanism.inter.broker.protocol=OAUTHBEARER

listener.name.external.oauthbearer.sasl.jaas.config=\

org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \

required unsecuredLoginStringClaim_sub="kafka"; # broker间连接使用的令牌的主体声明。

客户端必须配置主体声明选项unsecuredLoginStringClaim_sub。也可以配置其他声明和令牌生命周期。

sasl.mechanism=OAUTHBEARER

sasl.jaas.config=\

org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \

required unsecuredLoginStringClaim_sub="Peter"; // User:Peter是默认的KafkaPrincipal

为了将Kafka与第三方OAuth服务器集成,以便在生产环境中使用不记名令牌,Kafka客户端必须配置 sasl.login.callback.handler.class,通过长期密码或刷新令牌从OAuth服务器获取令牌。如果broker间通信启用了OAUTHBEARER,则broker也需要配置一个登录回调处理器,用于获取broker在发起客户端连接时需要用到的令牌。

public class OAuthTokenProvider implements AuthenticateCallbackHandler {

@Override

public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {

OAuthBearerToken token = null;

for (Callback callback : callbacks) {

if (callback instanceof OAuthBearerTokenCallback) {

token = acquireToken(); // 客户端必须从OAuth服务器获取一个令牌,并将其传给回调。

((OAuthBearerTokenCallback) callback).token(token);

} else if (callback instanceof SaslExtensionsCallback) { // 客户端可能还用了其他回调。

((SaslExtensionsCallback) callback).extensions(processExtensions(token));

} else {

throw new UnsupportedCallbackException(callback);

}

}

}

private SaslExtensions processExtensions(OAuthBearerToken token) {

return null;

}

private OAuthBearerToken acquireToken() {

return null;

}

}

broker还必须配置listener.name.external.oauthbearer.sasl.server.callback.handler.class,用于验证客户端提供的令牌。

public class OAuthTokenVerifier implements AuthenticateCallbackHandler {

@Override

public void handle(Callback[] callbacks) throws UnsupportedCallbackException {

for (Callback callback : callbacks) {

if (callback instanceof OAuthBearerValidatorCallback) {

OAuthBearerValidatorCallback cb = (OAuthBearerValidatorCallback) callback;

try {

cb.token(validatedToken(cb.tokenValue())); // OAuthBearerValidatorCallback包含来自客户端的令牌,broker会验证这个令牌。

} catch (OAuthBearerIllegalTokenException e) {

OAuthBearerValidationResult r = e.reason();

cb.error(errorStatus(г), г.failureScope(), r.failure0penIdConfig());

}

} else if (callback instanceof OAuthBearerExtensionsValidatorCallback) {

OAuthBearerExtensionsValidatorCallback ecb =

(OAuthBearerExtensionsValidatorCallback) callback;

ecb.inputExtensions().map().forEach((k, v) ->

ecb.valid(validateExtension(k, v))); // broker会验证来自客户端的可选扩展。

} else {

throw new UnsupportedCallbackException(callback);

}

}

}

private OAuthBearerToken validatedToken(String s) {

return null;

}

}

2.2.5.2 安全方面的考虑

由于SASL/OAUTHBEARER客户端通过网络发送OAuth 2.0不记名令牌,这些令牌可能会被用于冒充客户端,因此必须启用TLS来加密身份验证流量。如果令牌遭到破坏,那么可以使用短期令牌来限制泄露范围。可以开启broker的重新认证,防止连接的存活时间超过令牌有效期。为broker配置重新认证时间间隔,并与可撤销令牌相结合,有效限制了已有连接被撤销后可以继续使用令牌的时间。

2.2.6 委托令牌

委托令牌是一种在broker和客户端之间共享的密钥,它提供了一种轻量级的配置方式,无须向客户端应用程序分发SSL密钥存储或Kerberos密钥表。可以用委托令牌来减少身份验址服务器(如Kerberos密钥分发中心,KDC)的负载。一些框架(如Kafka Connect)可以用委托令牌来简化worker的安全配置。一个已经通过broker身份验证的客户端可以为同一个用户主体创建委托令牌,并将它们分发给worker,worker可以直接用这些令牌通过broker的身份验证。每个委托令牌由令牌标识符和作为共享秘密使用的哈希消息验证码(HMAC)组成。基于委托令牌的客户端身份验证使用的是SASL/SCRAM机制,令牌标识符就是用户名,HMAC就是密码。

可以使用Kafka Admin API或delegation-tokens命令创建或更新委托令牌。要为主体 User:Peter创建委托令牌,必须先用Alice的凭据对客户端进行身份验证(除委托令牌之外的任何一种身份验证协议)。使用委托令牌进行身份验证的客户端不能创建其他委托令牌。

$ bin/kafka-delegation-tokens.sh -- bootstrap-server localhost:9092 |

-- command-config admin.props -- create -- max-life-time-period -1 \

# 如果Peter运行了这个命令,那么生成的令牌就可以用来冒充Peter。这个令牌的所有者是User:Peter。我们将User:Bob配置为令牌的续订者。

$ bin/kafka-delegation-tokens.sh -- bootstrap-server localhost:9092 |

-- command-config admin.props -- renew -- renew-time-period -1 -- hmac c2Vjcmve #更新命令可以由令牌所有者(Peter)或令牌续订者(Bob)来执行。

2.2.6.1 配置委托令牌

要创建和验证委托令牌,所有broker都必须使用 delegation.token.master.key 参数配置相同的主密钥。这个主密钥只能通过重启所有broker进行轮换。所有已有的令牌都必须在更新主密钥之前删除,因为它们不会再被使用,应该在所有broker都更新了密钥之后再创建新令牌。

要用委托令牌进行身份验证,broker至少要启用一种SASL/SCRAM机制。客户端也需要配置SCRAM机制,将令牌标识符作为用户名,HMAC作为密码。KafkaPrincipal就是与令牌相关联的原始主体,比如User:Alice。

sasl.mechanism=SCRAM-SHA-512

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule \

required tokenauth="true" username="MTIz" password="c2VjcmV0"; #用带有 tokenauth的SCRAM机制配置委托令牌。

2.2.6.2 安全方面的考虑

与内置的SCRAM实现一样,委托令牌只适合用于有安全ZooKeeper的 环境中。SCRAM的所有安全注意事项也都适用于委托令牌。

用于生成令牌的主密钥必须通过加密或外部化到安全密码存储的方式进行保护。如果令牌遭到破坏,则可以用短期委托令牌来限制泄露范围。可以开启broker的重新认证,防止连接使用过期的令牌,并限制已有连接在令牌被删除后继续存在的时间。

2.7 重新认证

当客户端向broker发起连接时,broker会对客户端进行身份验证。broker会验证客户端凭证,如果凭证有效,验证就通过。一些安全机制(比如Kerberos和OAuth)使用的凭证的生存期是有限的。Kafka的一个后台登录线程负责在旧凭证过期之前获取新凭证,但在默认情况下,新凭证只用于验证新连接。使用旧凭证验证的旧连接将继续处理请求,直到因为某些原因(比如请求超时、空闲超时或网络错误)断开连接。在用于验证连接的凭证过长寿命连接可能会继续处理请求。可以通过配置connections.max.reauth.ms参数让broker进行重新认证。

connections.max.reauth.ms=0

可以添加监听器和认证机制作为前缀,例如:listener.name.<_ listener-name_>.oauthbearer.connections.max.reauth.ms=3600000 如果这个参数的值被设置为正整数,那么broker就会在进行SASL 握手期间将SASL连接的会话生存期告诉客户端。会话生存期是凭证剩余生存期和ronnections.max.reauth.ms二者当中的较小者。任何在这个间隔内没有进行重新认证的连接都将被broker终止。客户端可以使用后台登录线程获取的最新凭证或自定义回调注入的最新凭据来执行重新认证。

重新认证可在以下几种场景中加强安全性。

对使用了有限生存期凭证的SASL机制(比如GSSAPI和OAUTHBEARER)来说,重新认证可以保证所有活动连接都与有效凭证相关联。短期凭证可以在遭到破坏时限制泄露范围。基于密码的SASL机制(比如PLAIN和SCRAM)可以通过定期登录来实现密码轮换。重新认证可以限制使用旧密码进行身份验证的连接在密码轮换后继续处理请求的时间。 自定义服务器回调允许在一段时间内同时使用新密码和旧密码,这样就可以避免在所有客户端迁移到新密码之前造成停机。设置 connections.max.reauth.ms参数,强制所有SASL机制进行重新认证,包括那些凭证未过期的机制。这样就可以限制活动连接在凭证被撤销后继续存在的时间。不支持SASL重新认证的客户端连接会在会话过期时被终止,客户端必须重新连接并进行身份验证,从而提供与凭证过期或凭证被撤销时同样的安全保证。

被泄露的用户身份 如果用户身份被泄露,则必须尽快将其从系统中移除。一旦用户被从认证服务器上移除,该用户所有的新连接都将无法通过broker的身份验证。已有连接将继续处理请求,直到执行下一次重新认证。如果没有配置connections.max.reauth.ms,就不会进行重新认证,已有连接可能会在很长一段时间内继续使用被泄露的用户身份。Kafka不支持SSL重新协商,因为旧SSL协议的重新协商过程存在已知的漏洞。较新的协议(如TLSv1.3)不支持重新协商。 因此,已有SSL连接可以继续使用已撤销或过期的证书。可以用用户主体的 Deny ACL 来阻止这些连接继续执行操作。因为ACL变更可以很快被应用于 所有broker,所以这是禁用被泄露用户访问权限最快的方法。

2.8 安全更新不停机

Kafka需要定期轮换密钥、应用安全补丁以及更新到最新的安全协议。大部分维护任务是通过滚动更新的方式进行的,也就是用新的配置逐个重启broker,而像更新SSL密钥存储和信任存储这类任务则可以通过动态配置更新来执行,无须重启broker。 在向已有集群添加新的安全协议时,可以在保留旧监听器的同时加入新监听器,确保在更新安全协议期间客户端可以继续使用旧监听器。例如,可以按照下面的顺序将已有集群的PLAINTEXT 切换到SASL_SSL。

使用Kafka配置工具为每一个broker的新端口添加一个新监听器。使用单独的配置更新命令更新listeners和advertised.listeners,让它们同时包含旧监听器和新监听器,并用监听器前缀为新的SASL_SSL监听器提供所有的配置参数。期之后,2. 修改所有的客户端应用程序,让它们使用新的SASL_SSL监听器。如果broker间通信也使用了新的SASL_SSL监听器,则需要用新的inter.broker listener.name 滚动更新所有broker。使用配置工具删除listeners和advertised.listeners中的监听器,并删除未被使用 的旧监听器配置参数。

在向已有SASL监听器添加SASL机制或从已有SASL监听备中删除SASL机制时,可以在同一个监听器端口上进行滚动更新,以此来避免停机。可以按照下面的顺序将PLAIN切换到SCRAM-SHA-256。 4. 使用Kafka配置工具将所有已有用户添加到SCRAM存储中。 5. 设置 sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,并为监听器配置 listener.name.<_ listener-name_>.scram-sha-256.sasl.jaas.config,然后进行broker滚动更新。 6. 修改所有的客户端应用程序,设置sasl.mechanism=SCRAM-SHA-256,并使用SCRAM更新 sasl.jaas.config。 7. 如果监听器被用于broker间通信,则需要设置sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256,并进行broker滚动更新。 8. 移除PLAIN机制,再进行一次broker滚动更新。设置sasl.enabled.mechanisms=SCRAM-SHA-256,并移除 listener.name .< listener-name>.plain.sasl.jaas.config以及与PLAIN相关的配置参数。

3 加密

加密被用于保护数据的隐私和完整性。

3.1 传输层加密

启用了SSL和SASL_SSL安全协议的Kafka监听器将TLS作为传输层,提供了安全的加密通道,以用来保护在不安全网络上传输的数据。

3.2 数据存储加密

还需要采取额外的措施来保护静态数据,确保有Kafka日志存储磁盘物理访问权限的用户也无法检索敏感数据。物理磁盘可以采用全磁盘加密或数据卷加密,即使磁盘被盗,也可以避免数据泄露。

3.3 端到端加密

虽然传输层和数据存储加密已经提供了足够的保护,但仍然需要提供额外的保护措施,避免将自动数据访问权限授予平台管理员。broker内存中未加密的数据可能会出现在堆转储中,有磁盘访问权限的管理员可以直接访问这些数据和包含潜在敏感数据的Kafka日志。

如果集群中有高度敏感的数据或PII,则需要采取额外措施来保护数据隐私。为了符合监管合规性,特别是在云端,需要确保平台管理员或云供应商不能以任何方式访问机密数据。可以将自定义加密提供程序内嵌到Kafka客户端中,以此来实现端到端加密,保证整个数据流都是加密的。

PII(Personally Identifiable Information,个人可识别信息)是指可以用来唯一识别、联系或定位个人的数据。这些数据包括但不限于姓名、地址、电话号码、电子邮件地址、社会安全号码等。

3.3.1 端到端加密流程

可以将序列化器和反序列化器与加密库集成,在序列化期间进行消息加密,在反序列化期间进行消息解密。消息加密通常使用对称加密算法,比如AES。生产者可以用保存在密钥管理系统(KMS)中的共享加密密钥加密消息,消费者可以用它解密消息。broker不需要访问加密密钥,并且永远看不到消息里未加密的内容,因此在云端使用这种方法是安全的。解密消息所需的加密参数可以保存在消息标头或消息体(针对不支持消息标头的旧客户端)中。还可以在消息标头中包含数字签名,用于验证消息的完整性。

例如一个端到端加密数据流:

使用Kafka生产者发送了一条消息。生产者会使用来自KMS的加密密钥加密消息。加密的消息被发送给broker。broker会将加密的消息保存在分区日志中。broker会将加密的消息发送给消费者。消费者会使用来自KMS的加密密钥解密消息。

3.3.2 密钥管理

生产者和消费者必须配置凭证才能从KMS获取共享密钥。建议定期轮换密钥,以此来加强安全性,因为频繁轮换密钥可以限制被泄露的消息的数量,还可以防止暴力破解攻击。如果使用旧密钥加密的消息仍然符合保留策略,那么新旧密钥都必须可用。有很多KMS系统支持优雅的对称加密密钥轮换,无须在Kafka客户端做任何特殊处理。对于压实型主题,使用旧密钥加密的消息可能会被保留很长时间,而且可能需要对旧消息进行重新加密。为了避免新消息对其造成干扰,生产者和消费者在这个过程中需要离线。

3.3.3 压缩

压缩加密的消息 与加密前压缩消息相比,加密后压缩消息并不会在存储空间方面提供任何优 势。可以通过配置让序列化器在加密消息之前压缩消息,或者让应用程序在 生产消息之前压缩消息。不管是哪一种情况,最好都禁用Kafka的压缩,因 为它们增加了开销却不会带来任何额外的好处。对于在不安全传输层上传输 的消息,还需要考虑已压缩的加密消息的安全传输问题。

3.3.4 消息键加密

在很多环境中,特别是在使用TLS作为传输层时,消息键不需要加密,因为它们通常不 会像消息体那样包含敏感数据。但在某些情况下,明文的消息键可能不符合监管要求。由于在分区和压实时会用到消息的键,因此在对键进行转换时必须保留等价哈希,确保即使加密参数发生变化,键的哈希值也不会变。一种方法是将原始消息键的安全哈希作为消息键,并将加密的消息键保存在消息体或消息标头中。因为消息键和消息体的序列化是独立进行的,所以可以用生产者拦截器来执行这个转换。

4 授权

授权是决定你可以对哪些资源执行哪些操作的过程。broker会使用一个可定制的授权器进行访问控制管理。前面讲过,每当客户端向broker发起连接时,broker会对客户端进行身份验证,并将代表客户端身份的KafkaPrincipal与连接关联起来。在处理请求时,broker会验证与当前连接关联的主体是否被授权执行这个请求。

Kafka 提供了一个内置的授权器AclAuthorizer,我们可以通过下面的配置来启用它。 authorizer.class.name=kafka.security.authorizer.AclAuthorizer

SimpleAclAuthorizer AclAuthorizer 是在Kafka 2.3中引入的。从Kafka 0.9.0.0开始的旧版本中有 一个内置的授权器 kafka.security.auth.SimpleAclAuthorizer,虽然该授权 器已被弃用,但仍受支持。

AclAuthorizer 支持使用访问控制列表(ACL)对Kafka资源进行细粒度的访问控制。ACL保存在ZooKeeper中,broker也会将它们缓存在内存中,以便在授权请求时提供高性能的查找。broker在启动时会将ACL加载到缓存中,并通过ZooKeeper 的watcher通知机制来更新缓存。在授权请求时,broker会验证与当前连接关联的KafkaPrincipal是否有权限操作被请求的资源。

每个ACL由以下几部分组成:

资源类型:Cluster|Topic|Group|TransactionalId|DelegationToken。模式类型:Literal|Prefixed。资源名称:资源的名称、前缀或通配符*。操作:Describe|Create|Delete|Alter|Read|Write|DescribeConfigs|AlterConfigs。权限类型:Allow|Deny,其中Deny的优先级更高。主体:主体可以表示为<主体类型> :< 主体名称>,例如,User:Bob或Group:Sales。 如果要授予所有用户访问权限,那么可以使用User:*。主机:客户端连接的源IP地址。如果要为所有主机授权,那么可以使用*。

如果一个动作没有匹配的Deny ACL,并且至少有一个匹配的Allow ACL,那么 AclAuthorizer将对这个动作进行授权。如果授予了Read权限、Write权限、Alter权限或Delete权限,那么也会隐含授予Describe权限;如果授予了AlterConfigs权限,那么也会隐含授予DescribeConfigs权限。

Kafka 提供了一个使用broker中的授权器管理ACL的工具。

$KAFKA_HOME/bin/kafka-acls.sh --bootstrap-server 172.26.143.96:9092 --add --resource-pattern-type Literal --topic product Write --allow-principal User:Peter

$KAFKA_HOME/bin/kafka-acls.sh --bootstrap-server 172.26.143.96:9092 --add --topic product Describe --allow-principal User:ANONYMOUS

也可以直接在ZooKeeper中创建ACL,如果需要在启动broker之前创建ACL,那么这就非常有用。

$KAFKA_HOME/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \

--add --topic product \

--resource-pattern-type LITERAL \

--operation Write \

--allow-principal User:Peter

$KAFKA_HOME/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \

--add --topic test.* \

--resource-pattern-type PREFIXED \

--producer #producer 是一种生产端的方便式写法,相当于 --operation WRITE --operation CREATE,包括的权限有:WRITE、CREATE

--allow-principal User:Peter

$KAFKA_HOME/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \

--add --topic product Describe \

--allow-principal User:ANONYMOUS

$KAFKA_HOME/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \

--add \

--consumer \ #consumer 是一种消费端的方便式写法,相当于 --operation READ ,包括的READ

--topic test.1 \

--group c1 \

--allow-principal User:consumer

$KAFKA_HOME/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \

--add \

--operation Read \

--operation Describe \

--group consumers \

--resource-pattern-type LITERAL \

--allow-principal User:consumer

AclAuthorizer 提供了两个配置选项,可用于对资源或主体进行大范围的授权,以此来简化ACL管理,特别是在第一次为已有集群添加授权时。服务端server.properties:

#Authorization

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

#超级用户可以访问所有资源,不受任何限制,而且不能用Deny ACL禁止超级用户的权限。

#如果admin的凭证被泄露,则必须将admin从super.users中移除,然后重启broker,让变更重生效。在生产环境中,通过ACL为用户授予特定权限会更为安全,因为在必要时可以

#与Kafka中其他使用逗号分隔的配置参数不同,super.users使用分号来分隔多个用户,因为用户主体(比如SSL证书的可识别名称)通常会包含逗号。

super.users=User:admin;User:MaYun

#如果allow.everyone.if.no.acl.found被设置为true,那么所有用户都可以访问没有配置ACL的资源。如果是第一次在集群中启用授权或在开发过程中启用授权,那么这个可能很有用,但不适合用在生产环境中,因为这样有可能在无意中授予了访问新资源的权限。而且,如果添加了与前缀或通配符匹配的ACL,则不再满足no.acl.found条件,访问权限有可能被意外移除。

allow.everyone.if.no.acl.found=true

4.1 自定义授权

在Kafka中,可以自定义授权这样就可以实,现额外的控制或增加新的访问控制类型,比如基于角色的访问控制。 下面的自定义授权器限制了只有内部监听器可以处理某些请求。

package com.qupeng.demo.kafka.kafkaapache.security;

import kafka.security.authorizer.AclAuthorizer;

import org.apache.kafka.common.utils.Utils;

import org.apache.kafka.server.authorizer.Action;

import org.apache.kafka.server.authorizer.AuthorizableRequestContext;

import org.apache.kafka.server.authorizer.AuthorizationResult;

import java.util.Collections;

import java.util.List;

import java.util.Set;

import static org.apache.kafka.common.protocol.ApiKeys.CREATE_ACLS;

import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS;

import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;

public class CustomAuthorizer extends AclAuthorizer {

private static final Set internalOps = Utils.mkSet(CREATE_ACLS.id, DELETE_ACLS.id);

private static final String internalListener = "INTERNAL";

@Override

public List authorize(AuthorizableRequestContext context, List actions) {

// 如果不是内部请求,并且ACL是创建和删除的操作,禁止执行

if (!context.listenerName().equals(internalListener) &&

internalOps.contains((short) context.requestType())) {

return Collections.nCopies(actions.size(), DENIED);

} else {

// 如果是内部请求,执行默认的授权逻辑

return super.authorize(context, actions);

}

}

}

4.2 安全方面的考虑

ZooKeeper安全 因为AclAuthorizer将ACL保存在ZooKeeper中,所以要对ZooKeeper的访问进行安全限制。如果集群中没有安全的ZooKeeper,则可以实现一个自定义授权器,将ACL保存在安全的外部数据库中。合理使用前缀 在拥有大量用户的大型组织中,管理单个资源的ACL可能会非常烦琐。可以为不同的部门预留不同的资源前缀,这样就可以使用前缀ACL,以此来减少ACL的数量。还可以结合使用基于组或角色的ACL(如前面的例子所示),以进一步简化大型组织的访问控制配置。最小权限原则 当用户受到攻击时,基于最小权限原则来限制用户访问可以降低暴露风险。也就是说,我们只为每个用户主体授予执行其操作所需的资源的访问权限,并在ACL使用完毕之后将其移除。及时清理主体 当用户主体不再被使用(例如,当一个员工离开组织)时,应该立即移除对应的ACL。使用服务凭证 对于长时间运行的应用程序,可以使用服务凭证(而不是与特定用户关联的凭证),以避免员工离开组织时出现任何中断。使用Deny ACL 由于一个用户主体的长寿命连接有可能在用户被删除之后继续处理请求,因此可以用Deny ACL来确保主体不会因通配符ACL而无意中被授予了访问权限。避免重用主体 如果可能,要避免重用主体,防止将访问权限授予与旧版本主体关联的连接。

5 审计

broker 可以生成用于审计和调试的log4j日志。可以在log4j.properties文件中指定日志级别、日志记录器及其配置选项。用于授权日志的kafka.authorizer.logger和用于请求日志的kafka.request.logger 可以单独配置日志级别和保留策略。在生产环境中,可以用一些框架(如Elastic Stack)来分析和可视化这些日志。

授权器会记录INFO级别的拒绝访问日志和DEBUG级别的授权访问日志。

DEBUG Principal = User:Alice is Allowed Operation = Write from host = 127.0.0.1

on resource = Topic:LITERAL:customerOrders for request = Produce with resourceR-

efCount =1 (kafka.authorizer.logger)

INFO Principal = User:Mallory is Denied Operation = Describe from host =

10.0.0.13 on resource = Topic:LITERAL:customerOrders for request = Metadata

with resourceRefCount = 1 (kafka.authorizer.logger)

DEBUG级别的请求日志中包含了用户主体和客户端主机的相关信息。如果请求日志设置了TRACE级别,那么日志中也会包含请求细节。

DEBUG Completed request:RequestHeader(apikey=PRODUCE, apiVersion=8,

clientId=producer-1, correlationId=6) --

{acks =- 1,timeout=30000,partitionSizes=[customerOrders-0=15514]},response:

{responses=[{topic=customerOrders,partition_responses=[{partition=0,error_code=0

,base_offset=13,log_append_time =- 1,log_start_offset=0,record_errors=[],error_mes

sage=null}]}],throttle_time_ms=0} from connection

127.0.0.1:9094-127.0.0.1:61040-0;totalTime:2.42,requestQueueTime:0.112,local-

Time:2.15,remoteTime:0.0,throttleTime:0,responseQueueTime:0.04,sendTime:

0.118, securityProtocol: SASL_SSL,principal:User: Alice, listener : SASL_SSL, clientInf

ormation:ClientInformation(softwareName=apache-kafka-java,

softwareVersion=2.7.0-SNAPSHOT) (kafka.request.logger)

可以通过分析授权器和请求日志来检测可疑活动。身份验证失败率指标和授权失败日志对审计来说非常有用,而且在遭到攻击或发生未授权访问时可以为我们提供有价值的信息。为了能够进行端到端的消息审计和追踪,可以在生成消息时将审计元数据放在消息标头中。端到端加密可用于保护元数据的完整性。

6 配额

请参考: https://blog.csdn.net/yunyun1886358/article/details/135434017

7 Zookeeper安全

用于维护Kafka集群可用性的元数据就保存在ZooKeeper中,所以,除了保护Kafka,也 很有必要保护ZooKeeper。ZooKeeper支持基于SASL/GSSAPI的Kerberos身份验证和基于SASL/DIGEST-MD5的用户名和密码身份验证。ZooKeeper 3.5.0中加入了对TLS的支持,可以进行双向身份验证和传输数据加密。需要注意是,SASL/DIGEST-MD5应该与TLS加密一起使用,并且不适合用在生产环境中,因为它存在已知的漏洞。

7.1 SASL

ZooKeeper的SASL是通过Java系统属性 java.security.auth.login.config来配置的。 这个属性必须指向JAAS配置文件,其中包含了一个登录模块以及与ZooKeeper服务器相关的配置参数。broker必须配置ZooKeeper客户端登录模块,以便与启用了SASL的ZooKeeper服务器通信。下面的ZooKeeper服务器端JAAS配置启用了Kerberos身份验证。

Server {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true storeKey=true

keyTab="/path/to/zk.keytab"

principal="zookeeper/zk1.example.com@EXAMPLE.COM";

}:

要启用ZooKeeper服务器端的SASL 身份验证,需要在ZooKeeper配置文件中配置身份验证提供程序。

authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

kerberos.removeHostFromPrincipal=true

kerberos.removeRealmFromPrincipal=true

broker 主体 在默认情况下,ZooKeeper用完整的Kerberos主体(如kafka/broker1.example.com@EXAMPLE.COM)来识别客户端。如果ZooKeeper的身份验证启用了ACL,那么ZooKeeper服务器端需要配置 kerberos.removeHostFromPrincipal=true和 kerberos.removeRealmFromPrincipal=true,确保所有的broker都有相同的主体。

Kafka broker 需要启用到ZooKeeper的SASL身份验证,为此需要使用包含客户端凭证的JAAS配置文件。

Client {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true storeKey=true

keyTab="/path/to/broker1.keytab"

principal="kafka/broker1.example.com@EXAMPLE.COM";

};

7.2 SSL

ZooKeeper的每一个端点都应该启用SSL包括使用SASL身份验证的那些。与Kafka一样,启用客户端身份验证需要配置SSL,但与Kafka不同,ZooKeeper会用SASL和SSL两种协议进行客户端身份验证,并将多个主体与连接关联起来。如果任意一个与某个连接关联的主体有访问某个资源的权限,那么ZooKeeper就会授予对这个资源的访问权限。 要为ZooKeeper服务器配置SSL,需要提供一个包含服务器主机名或者通配符主机名的密钥存储。如果启用了客户端身份验证,则还需要提供一个用于验证客户端证书的信任存储。

secureClientPort=2181

serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider

ssl.keyStore.location=/path/to/zk.ks.p12

ssl.keyStore. password=zk-ks-password

ssl.keyStore.type=PKCS12

ssl.trustStore.location=/path/to/zk.ts.p12

ssl.trustStore.password=zk-ts-password

ssl.trustStore.type=PKCS12

要为Kafka配置SSL以便连接到ZooKeeper,需要提供一个用于验证ZooKeeper证书的信任存储。如果启用了客户端身份验证,则还需要提供一个密钥存储。

zookeeper.ssl.client.enable=true

zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty

zookeeper.ssl.keystore.location=/path/to/zkclient.ks.p12

zookeeper.ssl.keystore.password=zkclient-ks-password

zookeeper.ssl.keystore.type=PKCS12

zookeeper.ssl.truststore.location=/path/to/zkclient.ts.p12

zookeeper.ssl.truststore.password=zkclient-ts-password

zookeeper.ssl.truststore.type=PKCS12

7.3 授权

可以通过设置路径ACL来启用ZooKeeper节点授权。如果broker配置了 zookeeper.set.acl=true,就会在创建节点时设置ZooKeeper节点ACL。在默认情况下,元数据节点是可公开访问的,但只有broker能修改。如果内部管理员用户需要直接通过ZooKeeper修改元数据,那么需要添加额外的ACL。敏感路径(比如包含SCRAM凭证的节点)在默认情况下是不公开的。

8 保护平台

前文讨论了如何保护Kafka和ZooKeeper。生产环境的系统所采用的安全威胁模型不仅要涵盖单个组件的安全威胁,还要涵盖整个系统的安全威胁。威胁模型提供了一个系统抽象,用于识别潜在的威胁和相关风险。在评估、记录并基于风险等级确定了威胁优先级之后,必须实施针对每个潜在威胁的缓解策略,以确保整个系统的安全。在评估潜在威胁时,需要考虑外部威胁和内部威胁。对于存储PII或其他敏感数据的系统,还必须实施符合监管政策的措施。不过,与威胁模型建模技术相关的内容超出了本章的讨论范围。 除了用安全的身份验证、授权和加密来保护Kafka中的数据和ZooKeeper中的元数据,还必须采取额外的措施来确保平台的安全性。可以用网络防火墙来保护网络,用加密来保护物理存储,用文件系统权限来保护包含身份验证凭证的密钥存储、信任存储和Kerberos密钥表文件。必须对包含安全关键信息(如凭证)的配置文件进行访问限制。由于在配置文件中保存明文密码是不安全的(即使对访问权限进行了限制),因此Kafka支持将密码外部化到安全存储中。

8.1 保护密码

可以为broker和客户端配置提供程序,用于从安全的第三方密码存储库获取密码。也可以将密码加密后保存在配置文件中,并提供用于解密的提供程序。

下面的提供程序使用gpg来解密保存在文件中的broker或客户端属性。

package com.qupeng.demo.kafka.kafkaapache.security;

import org.apache.kafka.common.config.ConfigData;

import org.apache.kafka.common.config.provider.ConfigProvider;

import org.apache.kafka.common.utils.Shell;

import java.io.IOException;

import java.io.StringReader;

import java.util.HashMap;

import java.util.Map;

import java.util.Properties;

import java.util.Set;

import java.util.stream.Collectors;

public class GpgProvider implements ConfigProvider {

@Override

public void configure(Map configs) {

}

@Override

public void close() {

}

@Override

public ConfigData get(String path) {

try {

String passphrase = System.getenv("PASSPHRASE"); //通过环境变量PASSPHRASE来提供用于解密密码的密码短语。

String data = Shell.execCommand( //用gpg解密配置文件。返回值中包含了全部的解密配置参数。

"gpg", " -- decrypt", " -- passphrase", passphrase, path);

Properties props = new Properties();

props.load(new StringReader(data)); //将配置数据解析成Java属性。

Map map = new HashMap<>();

for (String name : props.stringPropertyNames())

map.put(name, props.getProperty(name));

return new ConfigData(map);

} catch (IOException e) {

throw new RuntimeException(e); //如果遇到错误,就抛出RuntimeException异常。

}

}

@Override

public ConfigData get(String path, Set keys) { //调用者可能想要获取指定文件中的部分属性,我们在这里加载所有属性,并返回调用者请求的部分。

ConfigData configData = get(path);

Map data = configData.data().entrySet()

.stream().filter(e -> keys.contains(e.getKey()))

.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return new ConfigData(data, configData.ttl());

}

}

可以用gpg来加密凭证文件:

gpg -- symmetric -- output credentials.props.gpg \

-- passphrase "$PASSPHRASE" credentials.props

现在在原始属性文件中添加间接配置和提供程序,这样Kafka客户端就可以从被加密的文件中加载它们的凭证了。

username=${gpg:/path/to/credentials.props.gpg:username}

password=${gpg:/path/to/credentials.props.gpg:password}

config.providers=gpg

config.providers.gpg.class=com.example.GpgProvider

也可以用Kafka配置工具将加密过的敏感broker配置信息保存在ZooKeeper中。可以在启动broker之前执行如下命令,以将加密过的SSL密钥存储密码保存到ZooKeeper中。必须在每个broker的配置文件中配置用于解密的加密密钥。

$ bin/kafka-configs.sh -- zookeeper localhost:2181 -- alter |

-- entity-type brokers -- entity-name 0 -- add-config

'listener.name.external.ssl.keystore.password=server-ks-

password,password.encoder.secret=encoder-secret'

附录

1 JAAS

JAAS(Java Authentication Authorization Service),即 Java 认证与授权,使用可插拔方式将认证与授权服务和应用程序分离开,提供了灵活和可伸缩的机制来保证客户端或服务器端的 Java 程序;本文主要介绍 JAAS 的基本概念及使用方法。

1.1 简介

Java 早期的安全框架强调的是通过验证代码的来源和作者,保护用户避免受到下载下来的代码的攻击。JAAS 强调的是通过验证谁在运行代码以及他的权限来保护系统免受攻击。它让你能够将一些标准的安全机制,例如 Solaris NIS(网络信息服务)、Windows NT、LDAP(轻量目录存取协议),Kerberos 等通过一种通用的,可配置的方式集成到系统当中去。

Java 安全框架最初集中在保护用户运行潜在的不可信任代码,是基于代码的来源(URL)和谁创建的代码(certificate)来给移动代码进行授权。Java 2 SDK 1.3 引入了JAAS( Java Authentication and Authorization Service),增加了基于用户的访问控制能力,即根据谁在运行代码来进行授权。JAAS 已经整合进了Java 2 SDK 1.4,作为标准的用户认证与授权模型。

JAAS 认证被实现为可插拔的方式,允许应用程序同底层的具体认证技术保持独立,新增或者更新认证方法并不需要更改应用程序本身。应用程序通过实例化 LoginContext 对象开始认证过程,引用配置文件中的具体认证方法,即 LoginModule 对象,来执行认证。

1.2 核心类及接口

1.2.1 认证

Subject

认证的主体,可以代表一个人,一个角色一个进程等。认证成功之后可以从 LoginContext 获取 Subject,代表一种身份,用户可以通过这个身份执行一些需要权限才能运行的逻辑。

Principals

可认为是一种权限。一个 Subject 可以包含多个 Principal。用户认证成功之后,把授予的 Principal 加入到和该用户关联的 Subject 中,该用户便具有了这个 Principal 的权限。

Credentials

凭证,包括公共凭证(Public credentials,如:名称或公共密钥)和私有凭证(Private credentials,如:口令或私有密钥)。凭证并不是一个特定的类或借口,它可以是任何对象。凭证中可以包含任何特定安全系统需要的验证信息,例如标签(ticket),密钥或口令。

LoginContext

LoginContext 提供了对 Subject 进行身份验证的基本方法,并提供了一种不依赖于底层身份验证技术的应用程序开发方法。LoginContext 查询配置以确定为应用程序配置的身份验证服务或LoginModule。因此,可以在应用程序下插入不同的 Loginmodule,而不需要对应用程序本身进行任何修改。

LoginModule

登录模块,不同的 LoginModule 对应了不同的认证方式。例如:Krb5LoginModule 使用 Kerberos 作为认证方式,FileLoginModule 使用外部保存的密码文件来认证,UsernameLoginModule 通过用户名密码来认证。

CallbackHandler

在某些情况下,LoginModule 需要与用户通信以获取身份验证信息;LoginModule 使用 CallbackHandler 来实现此目标。应用程序实现 CallbackHandler 接口并将其传递给 LoginContext, LoginContext 将其直接转发给底层 LoginModule。LoginModule 使用 CallbackHandler 来收集来自用户的输入(例如密码或智能卡 pin 号)或向用户提供信息(例如状态信息)。应用重新通过指定 CallbackHandler 来实现与用户交互的各种不同方式。例如,GUI 应用程序CallbackHandler 的实现可能会显示一个窗口来征求用户的输入。另一方面,非 GUI 工具的 CallbackHandler 的实现可能只是简单地提示用户直接从命令行输入。

Callback

javax.security.auth.callback 包中包含了 Callback 接口及几个实现类。LoginModules 将一个 Callback 数组传递给 CallbackHandler 的 handle 方法。

1.2.2 授权

Policy

policy 类是一个抽象类,用于表示系统范围的访问控制策略。policy 有对应的 policy 文件来配置访问权限。

AuthPermission

AuthPermission 类封装了 JAAS 所需的基本权限;AuthPermission包含一个名称,但不包含操作列表。

PrivateCredentialPermission

PrivateCredentialPermission 类用于保护对 Subject 私有凭证的访问。

1.3 配置文件

1.3.1 JAAS Login Configuration File

JAAS 登录配置文件,主要用于用户的认证;一个配置文件中可以包含多个 entry,一个 entry 的格式如下:

{

;

;

};

一个 entry 的配置主要包括 LoginModule、flag、LoginModule options,一个配置例子如下:

MyLogin {

com.abc.demo.general.jaas.MyLoginModule required;

};

LoginModule LoginModule 的类型决定了使用何种方式认证,需要配置为全限定名。一个 entry 可以配置多个 LoginModule,认证流程会按照顺序依次执行各个 LoginModule,flag 的值会影响认证流程的走向。flag

Required:此 LoginModule 必须成功;无论成功与否,认证流程都会继续走后面的 LoginModule。Requisite:此 LoginModule 必须成功;如果成功,认证流程会继续走后面的 LoginModule,如果失败,认证流程终止。Sufficient:此 LoginModule 不必成功;如果成功,认证流程终止,如果失败,认证流程会继续走后面的 LoginModule。Optional:此 LoginModule 不必成功;无论成功与否,认证流程都会继续走后面的 LoginModule。

整个认证流程是否成功的判定标准:

只有当所有的 Required 和 Requisite LoginModule 成功时,整个认证流程才成功。如果配置了 Sufficient LoginModule 并且认证成功,在它之前所有的 Required 和 Requisite LoginModule 都成功,整个认证流程才算成功。如果没有配置任何 Required 和 Requisite LoginModule,Sufficient 或 Optional LoginModule 至少成功一个,整个认证流程才算成功。

options options 为 LoginModule 接收的认证选项。格式为 0 或多个 key=value,常用于传递认证附属参数。在 LoginModule 中使用一个 Map 来接收并处理这些参数。

1.3.2 Policy File

policy 文件主要用来配置权限,其格式为:

复制代码

grant signedBy "signer_names", codeBase "URL",

principal principal_class_name "principal_name",

principal principal_class_name "principal_name",

... {

permission permission_class_name "target_name", "action",

signedBy "signer_names";

permission permission_class_name "target_name", "action",

signedBy "signer_names";

...

};

signedBy 针对某个签名者赋予权限。可使用 jarsigner xxx.jar signer_name 为 jar 文件签名。codeBase 用于为某个目录下的用户代码授权。principal 用于为特定的 Principal 授权。permission permission 部分为赋予的具体权限;permission_class_name 表示权限名称,target_name 表示权限作用的目标,action 表示权限;如:

//读取d盘各级目录所有文件的权限

permission java.io.FilePermission "d:/-", "read";

//读取/设置环境变量的权限

permission java.util.PropertyPermission "os.name", "read";

permission java.util.PropertyPermission "java.security.auth.login.config", "write";

复制代码

JAAS 支持很多种类型的权限,它们都继承至 java.security.Permission 类,可查看它的子类来确定具体的权限;常用的权限如下:

AuthPermission:认证操作权限

FilePermission:文件访问权限

PropertyPermission:属性访问权限

AllPermission:所有权限

SocketPermission:网络通信权限

1.3 示例代码

https://gitee.com/qp1886358/com.qupeng.demo.jaas

2 PGP

2.1 什么是 GPG

GPG 可以分成两部分来看:

第一部分:GPG 是一个加密、解密、签名、验证工具。前面介绍了加解密的基本概念、相关的算法。但这些概念和算法并不能直接使用,而 GPG 就是一个使用这些算法,对信息进行加密、解密、签名、验证的工具。具体介绍见如何使用部分。

第二部分:GPG 还是一个密钥管理工具,可以用于管理自己的私钥,其他人的公钥,以及提供了一套公钥信任体系。前面加解密概念中提到,公钥要广而告之,GPG 可以帮助我们更加安全高效的交换公钥。具体介绍见管理密钥部分。

另外在使用 GPG 时,可能还会看到 PGP、OpenPGP 等名词,他们是什么关系?PGP (Pretty Good Privacy) 是最早的于 1991 年发布的此类工具。PGP 很好用,但他是商业软件。于是 GNU 计划在 1999 年发布了开源版本的 GNU Privacy Guard (GnuPG 或 GPG)。而 OpenPGP 是于 1997 年制定的一套标准,GPG、PGP 等工具都实现了这套标准。

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: