1、一致性 一致性实际上是“正确性级别”的另一种说法,即在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者有多正确? 在流处理中,一致性分为3个级别:

at-most-once:这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。at-least-once:这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。exactly-once:这指的是系统保证在发生故障后得到的计数结果与正确值一致。 最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。 Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。 端到端严格一次即End-to-End Exactly-Once,从数据读取、引擎处理到写入外部存储的整个过程中,数据不重复、不丢失。端到端严格一次语义需要数据源支持可重放,外部存储支持事务机制,能够进行回滚。在Flink中,设计了两阶段提交协议,提供了框架级别的支持,即TwoPhaseCommitSinkFunctio 2、检查点:保证exactly-once Flink使用一种被称为“检查点”的特性,在出现故障时将系统重置回正确状态。 检查点是Flink最有价值的创新之一,因为它使Flink可以保证exactly-once,并且不需要牺牲性能。 Flink检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。 默认情况下,如果设置了检查点选项,则Flink只保留最近成功生成的一个检查点,而当Flink程序失败时,可以从最近的这个检查点来进行恢复。但是,如果希望保留多个检查点,并能够根据实际需要选择其中一个进行恢复,会更加灵活。默认情况下,检查点不会被保留,取消程序时即会删除它们,但是可以通过配置保留定期检查点,根据配置,当作业失败或者取消的时候,不会自动清除这些保留的检查点。 如果想保留检查点,那么Flink也设计了相关实现,可选项如下。ExternalizedCheckpointCleanup. RETAIN_ON_CANCELLATION:取消作业时保留检查点。在这种情况下,必须在取消后手动清理检查点状态。ExternalizedCheckpointCleanup. DELETE_ON_CANCELLATION:取消作业时删除检查点。只有在作业失败时检查点状态才可用。 保存点在Flink中叫作Savepoint,是基于Flink检查点机制的应用完整快照备份机制,用来保存状态,可以在另一个集群或者另一个时间点,从保存的状态中将作业恢复回来,适用于应用升级、集群迁移、Flink集群版本更新、A/B测试以及假定场景、暂停和重启、归档等场景。

3、作业恢复机制(容错机制) Flink提供了应用自动容错机制,可以减少人为干预,降低运维复杂度。 Flink提供了如下两种手动作业恢复方式: (1)外部检查点检查点完成时,在用户给定的外部持久化存储保存。当作业Failed(或者Cancled)时,外部存储的检查点会保留下来。用户在恢复时需要提供用于恢复的作业状态的检查点路径。 (2)保存点用户通过命令触发,由用户手动创建、清理。使用了标准化格式存储,允许作业升级或者配置变更。用户在恢复时需要提供用于恢复作业状态的保存点路径。 4、检查点的恢复 4.1 自动检查点的恢复 自动恢复可以在配置文件中提供全局配置,也可以在代码中为Job特别设定。 Flink提供了重启策略(restart strategy)使Job从最近一次checkpoint自动恢复现场。 作业常用的重启策略有以下三种: 1、固定延时重启(fixed-delay) 在Flink Job失败时,该策略按照restart-strategy.fixed-delay.delay参数给出的固定间隔试图重启Job。如果重启次数达到restart-strategy.fixed-delay.attempts参数规定的阈值之后还没有成功,就停止Job。 flink-conf.yaml中的配置: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 restart-strategy.fixed-delay.delay: 15s 或者在代码里对每个Job进行配置,优先级比flink-conf.yaml高: env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 10, // attempts Time.seconds(15) // delay )); 2、按失败率重启(failure-rate) 在Flink Job失败时,该策略按照restart-strategy.failure-rate.delay参数给出的固定间隔试图重启Job。如果重启次数在restart-strategy.failure-rate.failure-rate-interval的时间周期内达到restart-strategy.failure-rate.max-failures-per-interval参数规定的阈值之后还没有成功,就停止Job。 如果启用了failure-rate重启策略,但没设定参数的话,Flink默认会将3个参数的值分别设定为1次、1分钟和akka.ask.timeout参数指定的超时时间。 flink-conf.yaml中的配置: restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 10 restart-strategy.failure-rate.failure-rate-interval: 300s restart-strategy.failure-rate.delay: 15s 在代码里对每个Job进行配置: env.setRestartStrategy(RestartStrategies.failureRateRestart( 10, // max-failures-per-interval Time.minutes(5), // failure-rate-interval Time.seconds(15) // delay )); 3、无重启(none) 顾名思义,Job出现意外时直接失败。 flink-conf.yaml中的配置: restart-strategy: none 在代码里对每个Job进行配置: env.setRestartStrategy(RestartStrategies.noRestart()); 4.2 手动检查点恢复 因为Flink检查点目录分别对应的是jobId ,每通过flink run方式/页面提交方式恢复都会重新生成jobId。Flink提供了在启动之时通过设置-s参数指定检查点目录的功能,让新的jobId读取该检查点元文件信息和状态信息,从而达到指定时间节点启动作业的目的。 手动启动方式: /bin/flink -s /flink/checkpointns/017891231312bv12g312hf3123/check-50/_metadata -p @Parallelism -c @Mainclass @jar 5、保存点的恢复 保存点需要考虑以下问题: (1)算子的顺序改变如果对应的UID没变,则可以恢复,如果对应的UID变了则恢复失败。 (2)作业中添加了新的算子如果是无状态算子,没有影响,可以正常恢复,如果是有状态的算子,跟无状态的算子一样处理。 (3)从作业中删除了一个有状态的算子默认需要恢复保存点中所记录的所有算子的状态,如果删除了一个有状态的算子,从保存点恢复的时候被删除的OperatorID找不到,所以会报错,可以通过在命令中添加-allowNonRestoredState (short: -n)跳过无法恢复的算子。 (4)添加和删除无状态的算子如果手动设置了UID,则可以恢复,保存点中不记录无状态的算子,如果是自动分配的UID,那么有状态算子的UID可能会变(Flink使用一个单调递增的计数器生成UID,DAG改版,计数器极有可能会变),很有可能恢复失败。 (5)恢复的时候调整并行度Flink1.2.0及以上版本,如果没有使用作废的API,则没问题;1.2.0以下版本需要首先升级到1.2.0才可以。 6、恢复时的时间问题 例如,从kafka消费数据中每1分钟统计一次结果,进行作业或者Flink集群版本升级的时候,停了3个小时,使用保存点进行恢复的时候,可能kafka中已经累积了3个小时的数据,使用事件时间可以保证最终的处理结果是正确的一致的。如果使用的是处理时间,在这3个小时内积压的数据,可能会在10分钟之内处理完毕,这3个小时内的统计结果是0,10分钟之内的统计结果暴涨了几十乃至几百倍。所以如果需要进行恢复、升级,最好使用事件时间,而不是处理时间。 7、一致性保障的重要组件 1、检查点协调器 在Flink中检查点协调器叫作CheckpointCoordinator,负责协调Flink算子的State的分布式快照。当触发快照的时候,CheckpointCoordinator向Source算子中注入Barrier消息,然后等待所有的Task通知检查点确认完成,同时持有所有Task在确认完成消息中上报的State句柄。 2、检查点消息 在执行检查点的过程中,TaskManager和JobManager之间通过消息确认检查点执行成功还是取消,Flink中设计了检查点消息类体系。检查点消息中有3个重要信息:该检查点所属的作业标识(JobID)、检查点编号、Task标识(ExecutionAttemptID)。 检查点消息类体系的结构如下,主要包含有以下两种: [图片] (1)AcknowledgeCheckpoint消息 该消息从TaskExecutor发往JobMaster,告知算子的快照备份完成。 (2)DeclineCheckpoint消息该消息从TaskExecutor发往JobMaster,告知算子无法执行快照备份,如Task了Running状态但是内部还没有准备好执行快照备份。 8、检查点的执行过程 JobMaster作为作业的管理者,是作业的控制中枢,在JobMaster中的CheckpointCoordinator组件专门负责检查点的管理,包括何时触发检查点、检查点完成的确认。检查点的具体执行者则是作业的各个Task,各个Task再将检查点的执行交给算子,算子是最底层的执行者。执行过程可用下面图表示: [图片] 主要操作: 1、JobMaster触发检查点 在JobMaster开始调度作业的时候,会为作业提供一个CheckpointCoordinator,周期性地触发检查点的执行。在CheckpointCoordinator触发检查点的时候,只需通知执行数据读取的Task(SourceTask),从SourceTask开始会产生CheckPointBarrier事件,注入数据流中,数据流向下游流动时被算子读取,在算子上触发检查点行为。 该过程的源码表示: [图片] [图片] 2、TaskExecutor执行检查点 JobMaster通过TaskManagerGateway触发TaskManager的检查点执行,TaskManager则转交给Task执行。Task类中的部分,该类创建了一个CheckpointMetaData的对象,确保Task处于Running状态,把工作转交给StreamTask。 源码如下: [图片] 从StreamTask开始,执行检查点就开始区分StreamTask类型了,其中SourceStreamTask是检查点的触发点,产生CheckpointBarrier并向下游广播,下游的StreamTask根据CheckpointBarrier触发检查点。 源码如下: [图片] [图片] 如果Task是Running状态,那就可以执行检查点,首先在OperatorChain上执行准备CheckpointBarrier的工作,然后向下游所有Task广播CheckpointBarrier,最后触发自己的检查点。这样做可以尽快将CheckpointBarrier广播到下游,避免影响下游CheckpointBarrier对齐,降低整个检查点执行过程的耗时。如果Task是非Running,那就要向下游发送CancelCheckpointMarker,通知下游取消本次检查点,方法是发送一个CacelCheckpointMarker,与CheckpointBarrier相反的操作。 源码如下: [图片] 在StreamTask中经过一系列简单调用之后,异步触发OperatorChain中所有算子的检查点。算子开始从StateBackend中深度复制State数据,并持久化到外部存储中。注册回调,执行完检查点后向JobMaster发出CompletedCheckPoint消息,这也是端到端Exactly-Once中两阶段提交的一部分。 算子需要保存原始State和托管State(OperatorState、KeyedState)触发保存快照的动作之后,首先对OperatorState和KeyeState分别进行处理,如果是异步的,则将状态写入外部存储。 当一个算子完成其State的持久化之后,就会向JobMaster发送检查点完成消息,其具体逻辑在reportCompletedSnapshotStates中。这个方法把任务又最终委托给了RpcCheckpointResponder这个类。 在向JobMaster汇报的消息中,TaskStateSnapshot中保存了本次检查点的State数据,如果是内存型的StateBackend,那么其中保存的是真实的State数据,如果是文件型的StateBackend,其中保存的则是状态的句柄(StateHandle)。在分布式文件系统中的保存路径也是通过TaskStateSnapshot中保存的信息恢复回来的。状态的句柄分为OperatorStateHandle和KeyedStateHandle,分别对应于OperatorState和KyedState,同时也区分了原始状态和托管状态。RpcCheckpointResponder底层依赖Flink的RPC框架的方式远程调用JobMaster的相关方法来完成报告事件。 3、JobMaster确认检查点 JobMaster通过调度器ScheduerNG任务把信息交给CheckpointCoordinator.receiveAcknowledgeMessage,来响应算子检查点完成事件。CheckpointCoordinator在触发检查点时,会生成一个PendingCheckpoint,保存所有算子的ID。当PendingCheckpoint收到一个算子的完成检查点的消息时,就把这个算子从未完成检查点的节点集合移动到已完成的集合。 当所有的算子都报告完成了检查点时,CheckpointCoordinator会触发completePendingCheckpoint()方法执行以下过程: 1)把pendinCgCheckpoint转换为CompletedCheckpoint。 2)把CompletedCheckpoint加入已完成的检查点集合,并从未完成检查点集合删除该检查点,CompletedCheckpoint中保存了状态的句柄、状态的存储路径、元信息的句柄等信息。 3)向各个算子发出RPC请求,通知该检查点已完成。 检查点执行过程总结 检查点分为两阶段:第一阶段由JobMaster触发检查点,各算子执行检查点并汇报;第二阶段是JobMaster确认检查点完成,并通知各个算子,在这个过程中,出现任何异常都会导致检查点失败,放弃本次检查点。 9、检查点的恢复 JobMaster会将恢复状态包装到Task的任务描述信息中,Task使用TaskStateSnapshot向JobMaster汇报自身的状态信息,恢复的时候也是使用TaskStateSnapshot对象。 作业状态以算子为粒度进行恢复,包括OperatorState恢复、KeyedState恢复、函数级别的状态恢复。恢复OperatorState是各算子的通用行为,对于UDF算子(继承自AbstractUdfStreamOperator的算子),则需要额外函数状态。另外对于特殊的算子,如WindowOperator,还需要恢复窗口状态,窗口状态为KeyedSate类型。

OperatorState恢复 在初始化算子状态的时候,从OperatorStateStore中获取ListState类型的状态,由OperatorStateStore负责从对应的StateBackend中读取状态重新赋予算子中的状态变量函数State恢复 函数的状态恢复主要是针对有状态函数,这些函数的共同特点是继承了CheckpointedFunction或者ListCheckpointed接口。在Flink内置的有状态函数主要是Source、Sink函数,为了支持端到端严格一次的情况,Source函数需要能够保存数据读取的断点位置,作业故障恢复后能够从断点位置开始读取,Kafka等连接器中的读取数据的函数就实现了CheckpointedFunction。同样在Sink写出数据到外部存储的时候,有时也会需要恢复状态,如BucketingSink和TwoPhaseCommitSinkFunction。Keyed状态恢复 WindowOperator是KeyedState的典型应用,其窗口使用了KeyedState在StateBackend中保存窗口数据、定时器等,在恢复的时候,除了恢复OperatorState和函数State之外,还进行了窗口定时器等State的恢复。 10、Exactly-once严格一致的保证 在分布式环境下,保证端到端严格一次是一件非常困难的事情,尤其是并行输出的时候。一般来说flink端到端任务的pipeline如下图表示: [图片] 该系统存在非一致性的典型案例: 1)Sink1已经往Kafka写入了数据,Sink2写入失败。 2)Flink应用此时进行Failover,系统回滚到最近的一次成功的检查点,但是Sink1已经把数据写入Kafka了。 3)Flink无法回滚Kafka的State.因此,Kafka将在之后再次接收到一份同样的来自Sink1的数据。 4)虽然在引擎内部保证了严格一次的处理,但是从整体效果来看是至少一次的处理,在某些情况下这是不可容忍的问题。 针对一致性问题,Flink设计实现了一种两阶段提交协议,能够保证从读取、计算到写出整个过程的端到端严格一次,无论是什么原因导致的作业失败,都严格保证数据只影响结果一次,即不会重复计算,或者保证重复计算不影响结果的正确性。 Flink支持的端到端严格一次并不只限于Kafka,理论上可以使用任何Source/Sink,只要它们满足如下条件: (1)数据源支持断点读取即能够记录上次读取的位置(offset或者其他可以标记的信息),失败之后能够从断点处继续读取。 (2)外部存储支持回滚机制或者满足幂等性 Flink端到端强一致性的保证:两阶段提交协议预提交阶段 当开始执行检查点的时候进入预提交阶段,JobMaster向Soure Task注入CheckpointBarrier,Source Task将CheckpointBarrier插入数据流,向下游广播开启本次快照。CheckpointBarrier除了起到触发检查点的作用,在两阶段提交协议中,还负责将流中所有消息分割成属于本次检查点的消息以及属于下次检查点的两个集合,每个集合表示一组需要提交的数据,即属于同一个事务。CheckpointBarrier从Source算子流向中间算子,一直到Sink。 在检查点开始的时候,Kafka Source在快照中保存KafkaTopic的offset偏移量,下游的算子一直到Kafka Sink依次完成快照的保存,JobMaster再确认完成,一旦失败就回滚到最后一次成功完成的检查点中保存的状态,从备份的Kafka Offset开始去读数据重新执行。这种方式只能在数据源Kafka到Flink内部保证严格一次,一旦涉及从Sink写入到外部Kafka就会出现问题了。在预提交阶段,Sink把要写入外部存储的数据以State的形式保存到状态后端存储(StateBackend)中,同时以事务的方式将数据写入外部存储。倘若在预提交阶段任何一个算子发生异常,导致检查点没有备份到状态后端存储,所有其他算子的检查点也必须被终止,Flink回滚到最近成功完成的检查点。 [图片]提交阶段 预提交阶段完成之后,下一步就是通知所有的算子,确认检查点已成功完成。然后进入第二阶段——提交阶段。该阶段中JobMaster会为作业中每个算子发起检查点已完成的回调逻辑。 在预提交阶段,数据实际上已经写入外部存储,但是因为事务的原因是不可读的,所以Sink在事务提交阶段的工作稍微简单一点,当所有的Sink实例提交成功之后,一旦预提交完成,必须确保提交外部事务也要成功,此时算子和外部系统协同来保证。倘若提交外部事务失败(如网络故障等),Flink应用就会崩溃,然后根据用户重启策略进行回滚,回滚到预提交时的状态,之后再次重试提交。这个过程至关重要,如果提交外部事务失败,就可能出现数据丢失的情况。 [图片] 总结来说,两阶段提交协议依赖于Flink的两阶段检查点机制,JobMaster触发检查点,所有算子完成各自快照备份即预提交阶段,在这个阶段Sink也要把待写出的数据备份到可靠的存储中,确保不会丢失,向支持外部事务的存储预提交,当检查点的第一阶段完成之后,JobMaster确认检查点完成,此时Sink提交才真正写入外部存储。 Flink两阶段提交的实现 针对两阶段提交,Flink抽取了公共逻辑并封装进TwoPhaseCommitSinkFunction抽象类。类体系结构如下: [图片] 从TwoPhaseCommitSinkFunction的类体系来看,其比较特殊的地方是继承了CheckpointedFunction接口,在预提交阶段,能够通过检查点将待写出的数据可靠地存储起来;继承了CheckpointListener接口,在提交阶段,能够接收JobMaster的确认通知,触发提交外部事务。 案例:若要实现支持端到端严格一次的文件Sink,主要以下4种方法: 1)beginTransaction。开启一个事务,在临时目录下创建一个临时文件,之后写入数据到该文件中。此过程为不同的事务创建隔离,避免数据混淆。 2)preCommit。在预提交阶段,将缓存数据块写出到创建的临时文件,然后关闭该文件,确保不再写入新数据到该文件,同时开启一个新事务,执行属于下一个检查点的写入操作。此过程用于准备需要提交的数据,并且将不同事务的数据隔离开来。 3)commit。在提交阶段,以原子操作的方式将上一阶段的文件写入真正的文件目录下。如果提交失败,Flink应用会重启,并调用TwoPhaseCommitSinkFunction#recoverAndCommit方法尝试恢复并重新提交事务。 4)abort。一旦终止事务,删除临时文件。 Flink应用预提交完成之后,若在提交完成之前崩溃了,会恢复到预提交的状态,此时很有可能Sink已经部分提交但没有完全完成,需要进行事务的回滚,所以在State中需要保存足够多的信息,使作业重启之后能够重新提交事务或者回滚事务。TwoPhaseCommitSinkFunction考虑了这种场景,因此应用从检查点恢复之后,TwoPhaseCommitSinkFunction总是会发起一个抢占式的提交。

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: