1、下载安装zk,kafka...(大把教程,不在这里过多阐述)

2、引入pom

org.springframework.kafka

spring-kafka

3、Kafka配置

# kafka bootstrap-servers:连接kafka的地址,多个地址用逗号分隔

spring.kafka.bootstrap-servers=localhost:9092

# 设置是否批量消费,默认 single(单条),batch(批量)

spring.kafka.listener.type=single

## 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

spring.kafka.listener.ack-mode=batch

4、生产者配置

# producer 消费生产者配置-----

# 应答级别

# acks=0 把消息发送到kafka就认为发送成功

# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功

# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功

spring.kafka.producer.acks=all

# 重试次数 若设置大于0的值,客户端会将发送失败的记录重新发送

spring.kafka.producer.retries=3

# 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少de请求中。这有助于提升客户端和服务器端的性能。

# 这个配置控制一个批次的默认大小(单位 byte)。16384是缺省的配置

spring.kafka.producer.batch-size=16384

# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka,33554432是缺省配置

spring.kafka.producer.buffer-memory=33554432

# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

spring.kafka.producer.properties.linger.ms=1000

# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms

spring.kafka.producer.properties.max.block.ms=6000

# 关键字的序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# 值的序列化类

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

5、生产者发消息的工具类

import cn.hutool.json.JSONUtil;

import lombok.extern.slf4j.Slf4j;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.lang.NonNull;

import org.springframework.stereotype.Component;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

/**

* 生产者发送消息工具类

*

* @ClassName KafkaSenderUtils

* @Author destiny

* @Date 2023/4/21 11:04

*/

@Component

@Slf4j

public class KafkaSenderUtils {

private final KafkaTemplate kafkaTemplate;

/**

* 构造器方式注入 kafkaTemplate

*/

public KafkaSenderUtils(KafkaTemplate kafkaTemplate) {

this.kafkaTemplate = kafkaTemplate;

}

public void send(String topicName, String msg) {

try {

ListenableFuture> listenableFuture = kafkaTemplate.send(topicName, msg);

listenableFuture.addCallback(new ListenableFutureCallback>() {

@Override

public void onSuccess(SendResult result) {

log.info("发送成功回调:{}", JSONUtil.toJsonStr(result.getProducerRecord().value()));

}

@Override

public void onFailure(@NonNull Throwable ex) {

log.error(">>>>失败原因:{}", ex.getMessage());

log.info("发送失败回调");

}

});

} catch (Exception e) {

log.info("发送异常");

e.printStackTrace();

}

}

}

6、消费着配置

# consumer 消费端的配置,需要给consumer配置一个group-id

spring.kafka.consumer.group-id=test

# key的编解码方法

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# value的编解码方法

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费

# earliest:无提交记录,从头开始消费

# latest:无提交记录,从最新的消息下一条开始消费

spring.kafka.consumer.auto-offset-reset=latest

# 是否自动提交偏移量offset 默认 true

spring.kafka.consumer.enable-auto-commit=false

# 自动提交的频率。前提是 enable-auto-commit=true 单位 ms

spring.kafka.consumer.auto-commit-interval=100ms

# 一次调用poll()返回的最大记录数,默认是500(批量消费的数量)

spring.kafka.consumer.max-poll-records=5

# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作

spring.kafka.consumer.properties.session.timeout.ms=120000

# 请求超时 单位 ms

spring.kafka.consumer.properties.request.timeout.ms=120000

7、消费者配置类(配置批量消费)

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;

import java.util.HashMap;

import java.util.Map;

/**

* 卡夫卡消费者配置

*

* @ClassName KafkaConsumerConfig

* @Author destiny

* @Date 2023/2/2 18:53

*/

@Slf4j

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

/**

* kafka 集群,broker-list

*/

@Value("${spring.kafka.bootstrap-servers}")

private String servers;

/**

* 开启自动提交

*/

@Value("${spring.kafka.consumer.enable-auto-commit}")

private boolean enableAutoCommit;

/**

* 消费者组

*/

@Value("${spring.kafka.consumer.group-id}")

private String groupId;

/**

* 重置消费者的offset

*/

@Value("${spring.kafka.consumer.auto-offset-reset}")

private String autoOffsetReset;

/**

* 批量拉取个数

*/

@Value("${spring.kafka.consumer.max-poll-records}")

private int maxPollRecords;

@Value("${spring.kafka.consumer.properties.session.timeout.ms}")

private String sessionTimeout;

@Value("${spring.kafka.consumer.properties.request.timeout.ms}")

private String requestTimeout;

/**

* 卡夫卡侦听器容器工厂

*

* @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link String}, {@link String}>>

*/

@Bean

public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

// 设置 consumerFactory

factory.setConsumerFactory(consumerFactory());

// 设置消费者组中的线程数量

factory.setConcurrency(3);

// 设置轮询超时

factory.getContainerProperties().setPollTimeout(3000);

return factory;

}

/**

* 消费者工厂

*

* @return {@link ConsumerFactory}<{@link Integer}, {@link String}>

*/

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

/**

* 消费者配置

*

* @return {@link Map}<{@link String}, {@link Object}>

*/

@Bean

public Map consumerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);

// 自动提交 offset 默认 true

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

// 自动提交的频率 单位 ms

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

// 批量消费最大数量

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

// 消费者组

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

// session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);

// 请求超时

props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);

// Key 反序列化类

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Value 反序列化类

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 当kafka中没有初始offset或offset超出范围时将自动重置offset

// earliest:重置为分区中最小的offset

// latest:重置为分区中最新的offset(消费分区中新产生的数据)

// none:只要有一个分区不存在已提交的offset,就抛出异常

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

return props;

}

/**

* kafka批量监听

*

* @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link Integer}, {@link String}>>

*/

@Bean

public KafkaListenerContainerFactory> batchFactory() {

ConcurrentKafkaListenerContainerFactory factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// 设置 consumerFactory

factory.setConsumerFactory(consumerFactory());

// 设置是否开启批量监听

factory.setBatchListener(true);

// 设置消费者组中的线程数量

factory.setConcurrency(3);

return factory;

}

/**

* 消费异常处理器

*

* @return {@link ConsumerAwareListenerErrorHandler}

*/

@Bean

public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {

return (message, exception, consumer) -> {

// 打印消费异常的消息和异常信息

log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());

return exception;

};

}

}

8、测试类分别测试单条消费以及批量消费

import cn.hutool.json.JSONUtil;

import com.google.common.collect.Maps;

import com.xiaoju.framework.entity.common.ResultMap;

import lombok.extern.slf4j.Slf4j;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

import java.util.HashMap;

/**

* @ClassName KafkaDemo

* @Author destiny

* @Date 2023/2/1 16:32

*/

@RestController

@Slf4j

@RequestMapping(value = {"/api/kafka"})

public class KafkaDemo {

@Resource

private KafkaSenderUtils kafkaSenderUtils;

@GetMapping(value = {"/test"})

public void test() {

// 单条消费测试

String message = "message";

kafkaSenderUtils.send("testSingle", message);

}

@GetMapping(value = {"/testBatch"})

public void testBatch() {

// 批量消费测试

String message = "message";

for (int i = 1; i <= 20; i++) {

HashMap map = Maps.newHashMap();

map.put("id", i);

map.put("message", i + ":" + message + System.currentTimeMillis());

kafkaSenderUtils.send("batchTest", JSONUtil.toJsonStr(map));

}

}

}

9、消费者消费

package cn.ctyuncdn.consumer.kafka;

import cn.hutool.json.JSONUtil;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;

import java.util.List;

import java.util.stream.Collectors;

/**

* Kafka消费者

*

* @author destiny

* @date 2023/02/06

*/

@Component

@Slf4j

public class KafkaConsumer {

// 单条消费

@KafkaListener(id = "testSingle", topics = {"testSingle"}, groupId = "${spring.kafka.consumer.group-id}")

public void testSingle(ConsumerRecord record) {

String topic = record.topic();

String msg = record.value();

log.info("消费者接受消息:topic-->" + topic + ",msg->>" + msg);

}

// 批量消费

@KafkaListener(id = "batchTest", topics = {"batchTest"}, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")

public void batchTest(List> records) {

log.info(">>>consumer batch size ===>>{}", records.size());

for (ConsumerRecord record : records) {

String topic = record.topic();

String msg = record.value();

log.info("消费者接受消息:topic-->" + topic + ",msg->>" + msg);

}

}

}

完结。。。

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: