Kafka事务id重复

报错信息: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:619)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)     at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)     at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)     at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

解决方案: 1.flink 1.12.0以下版本都有的漏洞。需将版本升级到1.14以上版本 2.有以下两项措施:

在getKafkaProducer增加以下配置:      //checkpoint 间隔时间

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: