1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行度大于kafka的partition数,导致有部分subtask空闲,然后状态变为finished 在CheckpointCoordinator类的triggerCheckpoint方法中有如下代码段

// check if all tasks that we need to trigger are running.

// if not, abort the checkpoint

Execution[] executions = new Execution[tasksToTrigger.length];

for (int i = 0; i < tasksToTrigger.length; i++) {

Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();

if (ee == null) {

LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",

tasksToTrigger[i].getTaskNameWithSubtaskIndex(),

job);

throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);

} else if (ee.getState() == ExecutionState.RUNNING) {

executions[i] = ee;

} else {

LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",

tasksToTrigger[i].getTaskNameWithSubtaskIndex(),

job,

ExecutionState.RUNNING,

ee.getState());

throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);

}

ee.getState() == ExecutionState.RUNNING判断execution的状态是否为running,否则不做checkpoint 4、解决办法 将读取只有一个partition的topic的source任务并发改成1 5、结论 在消费kafka的数据时,source的并发度不能超过kafka的partition数,否则部分source无数据消费,导致finished,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。 问题结论:在消费kafka的数据时,source的并发度不能超过kafka的partition数,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。

与下面的同学遇到了一样的问题 链接:https://www.jianshu.com/p/9110ff473280

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: