文章目录

1. 背景2. 环境3. 操作步骤3.1 生成SSL证书3.2 配置zookeeper认证3.3 配置kafka安全认证3.4 使用kafka客户端进行验证3.5 使用Java端代码进行认证

1. 背景

kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。

在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

操作系统:linuxkafka版本:kafka_2.13-2.7.1zookeeper版本:apache-zookeeper-3.7.0应用程序版本:spring-boot-2.6.7、JDK1.8

3. 操作步骤

生成SSL证书配置zookeeper配置kafka前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),在业务代码中使用请查看(3.5)

3.1 生成SSL证书

按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥

参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

Server {

org.apache.zookeeper.server.auth.DigestLoginModule required

user_admin="1qaz@WSX"

user_kafka="1qaz@WSX";

};

第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="1qaz@WSX"

user_admin="1qaz@WSX"

user_kafka="1qaz@WSX";

};

Client {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="kafka"

password="1qaz@WSX";

};

kafka-client-jaas.conf

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="kafka"

password="1qaz@WSX";

};

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"

fi

...

**第三步:**修改 kafka 的 server.properties配置文件

#listeners=SSL://10.1.61.121:9092

host.name=node1

#listeners=PLAINTEXT://node1:9092,SSL://node1:9093

listeners=SASL_SSL://node1:9093

#advertised.listeners=SSL://node1:9092

advertised.listeners=SASL_SSL://node1:9093

ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks

ssl.keystore.password=Q06688

ssl.key.password=Q06688

ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks

ssl.truststore.password=Q06688

ssl.client.auth=required

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type=JKS

ssl.truststore.type=JKS

# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名

# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可

ssl.endpoint.identification.algorithm=

# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT

security.inter.broker.protocol=SASL_SSL

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

allow.everyone.if.no.acl.found=true

注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。

consumer.properties:

bootstrap.servers=node1:9093

security.protocol=SASL_SSL

ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks

ssl.truststore.password=Q06688

sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

producer.properties:

bootstrap.servers=node1:9093

security.protocol=SASL_SSL

ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks

ssl.truststore.password=Q06688

sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)

#生产

crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093 --topic first --producer.config ../config/producer.properties

>aaa

>bbb

>ccc

>

#消费

crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties

aaa

bbb

ccc

3.5 使用Java端代码进行认证

第一步: yaml 配置文件

spring:

kafka:

bootstrap-servers: localhost:9093

properties:

sasl:

mechanism: PLAIN

jaas:

#此处填写 SASL登录时分配的用户名密码(注意password结尾;)

config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

security:

protocol: SASL_SSL

ssl:

trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jks

trust-store-password: Q06688

key-store-type: JKS

producer:

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

batch-size: 106384

acks: -1

retries: 3

properties:

linger-ms: 1

retry.backoff.ms: 1000

buffer-memory: 33554432

第二步: 使用 kafkaTemplate 的方式,配置一个 config

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.io.FileUtils;

import org.apache.kafka.clients.admin.AdminClientConfig;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.config.SaslConfigs;

import org.apache.kafka.common.config.SslConfigs;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

import java.util.Map;

@Slf4j

@Configuration

public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Value("${spring.kafka.producer.acks}")

private String acks;

@Value("${spring.kafka.producer.retries}")

private String retries;

@Value("${spring.kafka.producer.batch-size}")

private String batchSize;

@Value("${spring.kafka.producer.properties.linger-ms}")

private int lingerMs;

@Value("${spring.kafka.producer.properties.buffer-memory}")

private int bufferMemory;

@Value("${spring.kafka.producer.key-serializer}")

private String keySerializer;

@Value("${spring.kafka.producer.value-serializer}")

private String valueSerializer;

@Value("${spring.kafka.properties.security.protocol}")

private String protocol;

@Value("${spring.kafka.properties.sasl.mechanism}")

private String mechanism;

@Value("${spring.kafka.ssl.trust-store-location}")

private String trustStoreLocation;

@Value("${spring.kafka.ssl.trust-store-password}")

private String trustStorePassword;

@Value("${spring.kafka.ssl.key-store-type}")

private String keyStoreType;

@Value("${spring.kafka.properties.sasl.jaas.config}")

private String jaasConfig;

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

/**

* the producer factory config

*/

@Bean

public ProducerFactory producerFactory() {

Map props = new HashMap();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ProducerConfig.ACKS_CONFIG, acks);

props.put(ProducerConfig.RETRIES_CONFIG, retries);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);

props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);

props.put("security.protocol", protocol);

props.put(SaslConfigs.SASL_MECHANISM, mechanism);

props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);

props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);

props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);

props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

return new DefaultKafkaProducerFactory(props);

}

}

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: