1.原因

(1)kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。但是当我们直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset。等重启之后,少数消息就会再次消费一次 (2)在Kafka中有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。消费端会从分配到的Partition里面去消费消息,如果消费者在默认的5分钟内没有处理完这一批消息。就会触发Kafka的Rebalance机制,从而导致offset自动提交失败。而Rebalance之后,消费者还是会从之前没提交的offset位置开始消费,从而导致消息重复消费。

2.解决方案

开启kafka本身存在的幂等性: 注:​ 添加唯一ID,类似于数据库的主键,用于唯一标记一个消息。 ProducerID:#在每个新的Producer初始化时,会被分配一个唯一的PID SequenceNumber:#对于每个PID发送数据的每个Topic都对应一个从0开始单调递增的SN值将获取的唯一id存表,(利用mySQl的唯一键约束,或者redis天然的set结构)

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: