一、背景

我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存

二、设置消费失败重试次数

1 默认重试次数在哪里看

Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils

2 如何修改重试次数

据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)

经过我不懈努力和尝试,只能通过Java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码

public CommonErrorHandler commonErrorHandler() {

BackOff backOff = new FixedBackOff(5000L, 3L);

return new DefaultErrorHandler(backOff);

}

然后把这个handler 添加到ConcurrentKafkaListenerContainerFactory中就行了

三、设置消费失败处理方式

1 保存到数据库重试

我们需要在创建DefaultErrorHandler类时加入一个ConsumerAwareRecordRecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitSync方法,首先这个consumer.commitSync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交Kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset

public CommonErrorHandler commonErrorHandler() {

// 创建 FixedBackOff 对象

BackOff backOff = new FixedBackOff(5000L, 3L);

DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {

log.info("save to db " + record.value().toString());

}, backOff);

return defaultErrorHandler;

}

如果你硬要提交也可以试试下面这种,指定提交当前的offset

public CommonErrorHandler commonErrorHandler() {

// 创建 FixedBackOff 对象

BackOff backOff = new FixedBackOff(5000L, 3L);

DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {

log.info("save to db " + record.value().toString());

Map offsets = new HashMap<>();

offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));

consumer.commitSync(offsets);

}, backOff);

return defaultErrorHandler;

}

2 发送到Kafka死信队列

仍然在创建DefaultErrorHandler类时加入一个DeadLetterPublishingRecoverer 类就行了,默认会把消息发到kafkaTemplate 配置的topic名字为your_topic+.DLT

@Autowired

private KafkaTemplate kafkaTemplate;

public CommonErrorHandler commonErrorHandler() {

// 创建 FixedBackOff 对象

BackOff backOff = new FixedBackOff(5000L, 3L);

DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);

return defaultErrorHandler;

}

ConsumerRecordRecoverer 接口总共就这2种实现方式

四、整体消费者代码粘贴

1 application.yml

kafka-consumer:

bootstrapServers: 192.168.31.114:9092

groupId: goods-center

#后台的心跳线程必须在30秒之内提交心跳,否则会reBalance

sessionTimeOut: 30000

autoOffsetReset: latest

#取消自动提交,即便如此 spring会帮助我们自动提交

enableAutoCommit: false

#自动提交间隔

autoCommitInterval: 1000

#拉取的最小字节

fetchMinSize: 1

#拉去最小字节的最大等待时间

fetchMaxWait: 500

maxPollRecords: 50

#300秒的提交间隔,如果程序大于300秒提交,会报错

maxPollInterval: 300000

#心跳间隔

heartbeatInterval: 10000

keyDeserializer: org.apache.kafka.common.serialization.LongDeserializer

valueDeserializer: org.springframework.kafka.support.serializer.JsonDeserializer

2 KafkaListenerProperties

package com.ychen.goodscenter.fafka;

import lombok.Getter;

import lombok.Setter;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Configuration;

@Configuration

//指定配置文件的前缀

@ConfigurationProperties(prefix = "kafka-consumer")

@Getter

@Setter

public class KafkaListenerProperties {

private String groupId;

private String sessionTimeOut;

private String bootstrapServers;

private String autoOffsetReset;

private boolean enableAutoCommit;

private String autoCommitInterval;

private String fetchMinSize;

private String fetchMaxWait;

private String maxPollRecords;

private String maxPollInterval;

private String heartbeatInterval;

private String keyDeserializer;

private String valueDeserializer;

}

3 KafkaConsumerConfig

package com.ychen.goodscenter.fafka;

import com.alibaba.fastjson2.JSONObject;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.listener.*;

import org.springframework.util.backoff.BackOff;

import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableConfigurationProperties(KafkaListenerProperties.class)

@Slf4j

public class KafkaConsumerConfig {

@Autowired

private KafkaListenerProperties kafkaListenerProperties;

@Autowired

private KafkaTemplate kafkaTemplate;

@Bean

public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// 并发数 多个微服务实例会均分

factory.setConcurrency(2);

// factory.setBatchListener(true);

factory.setCommonErrorHandler(commonErrorHandler());

ContainerProperties containerProperties = factory.getContainerProperties();

// 是否设置手动提交

containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

}

private ConsumerFactory consumerFactory() {

Map consumerConfigs = consumerConfigs();

log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));

return new DefaultKafkaConsumerFactory<>(consumerConfigs);

}

public CommonErrorHandler commonErrorHandler() {

// 创建 FixedBackOff 对象

BackOff backOff = new FixedBackOff(5000L, 3L);

DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);

// DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {

// log.info("save to db " + record.value().toString());

// Map offsets = new HashMap<>();

// offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));

// consumer.commitSync(offsets);

// }, backOff);

return defaultErrorHandler;

}

public Map consumerConfigs() {

Map propsMap = new HashMap<>();

// 服务器地址

propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers());

// 是否自动提交

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit());

// 自动提交间隔

propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval());

//会话时间

propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut());

//key序列化

propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer());

//value序列化

propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer());

// 心跳时间

propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval());

// 分组id

propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId());

//消费策略

propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset());

// poll记录数

propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords());

//poll时间

propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval());

propsMap.put("spring.json.trusted.packages", "com.ychen.**");

return propsMap;

}

}

4 MessageListener

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;

import com.ychen.goodscenter.vo.req.SubmitOrderReq;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.dao.DuplicateKeyException;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;

import org.springframework.stereotype.Component;

@Component

@Slf4j

public class MessageListener {

@Autowired

private OrderService orderService;

@KafkaListener(topics = "order-message-topic", containerFactory = "kafkaListenerContainerFactory")

public void processMessage(ConsumerRecord record, Acknowledgment acknowledgment) {

log.info("order-message-topic message Listener, Thread ID: " + Thread.currentThread().getId());

try {

log.info("order-message-topic message received, orderId: {}", record.value().getOrderId());

orderService.submitOrder(record.value());

// 同步提交

acknowledgment.acknowledge();

log.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());

} catch (DuplicateKeyException dupe) {

// 处理异常情况

log.error("order-message-topic message error DuplicateKeyException", dupe);

// 重复数据,忽略掉,同步提交

acknowledgment.acknowledge();

}

}

}

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: