Flink | Checkpoint 机制详解

Flink | Checkpoint 机制详解,第1张

一、Checkpoint 简介

Flink 的 Checkpoint 机制是其 可靠性 的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport (分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

二、Checkpoint 机制流程详解

1 任务启动

我们假设任务从 Kafka 的某个 Topic 中读取数据,该Topic 有 2 个 Partition,故任务的并行度为 2。根据读取到数据的奇偶性,将数据分发到两个 task 进行求和。

某一时刻,状态如下:

2启动Checkpoint

JobManager 根据 Checkpoint 间隔时间,启动 Checkpoint。此时会给每个 Source 发送一个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都会递增。

3 Source启动Checkpoint

当Source接收到barrier消息,会将当前的状态(Partition、Offset)保存到 StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广播给下游的每一个 task:

4task 接收 barrier

当task接收到某个上游(如这里的Source1)发送来的barrier,会将该上游barrier之前的数据继续进行处理,而barrier之后发送来的消息不会进行处理,会被缓存起来。

之前对barrier的理解比较模糊,直到看到了下面这幅图。barrier的作用和这里 "欢迎光临" 牌子的作用类似,用于区分流中的数据属于哪一个 Checkpoint:

我们可以理解为:barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓存。

5barrier对齐

如果某个task有多个上游输入,如这里的 sum_even 有两个 Source 源,当接收到其中一个 Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的 Source 发来的数据不会处理,只会缓存(如下图中的数据4)。而未接收到 barrier 的 Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为 barrier的对齐

barrier是否对齐决定了程序实现的是 Exactly Once 还是 At Least Once:

如果不进行barrier对齐,那么这里 sum_even 在接收 Source2 的 barrier 之前,对于接收到 Source1的 数据4 ,不会进行缓存,而是直接进行计算,sum_even 的状态改为12,当接收到 Source2 的barrier,会将 sum_even 的状态 sum=12 进行持久化。如果本次Checkpoint成功,在进行下次 Checkpoint 前任务崩溃,会根据本次Checkpoint进行恢复。此时状态如下:

从这里我们就可以看出, Source1的数据4被计算了两次 。因此,Exactly Once语义下,必须进行barrier的对齐,而 At Least Once语义下 barrier 可以不对齐。

注意:barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如 reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念,都会实现Exactly Once语义,即使程序中配置了At Least Once 。

6处理缓存数据

当task接收到所有上游发送来的barrier,即可以认为当前task收到了本次 Checkpoint 的所有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里 sum_even 会处理 Source1 发送来的数据4 而且,在这个过程中 Source 会 继续读取数据 发送给下游,并不会中断。

7上报Checkpoint完成

当sink收到barrier后,会向JobManager上报本次Checkpoint完成。至此,本次Checkpoint结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。

有状态的函数和 *** 作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂 *** 作的关键构建部件,例如:

当一个应用程序搜索某些特定的事件模式时,状态会保存截止到目前为止遇到过的事件的顺序;

当每分钟聚合事件时,状态会保存挂起的聚合

当通过数据点来训练机器学习模型时,状态会保存当前版本的模型参数

为了使state容错,Flink需要识别state并 checkpoint 它, 在许多情况下,Flink还管理着应用程序的状态,这意味着Flink处理内存管理(如果需要,可能会将内存中的数据溢出到磁盘)来保存非常大的state。

这篇文档介绍了在开发应用程序时如何使用Flink的state 抽象概念。

在Flink中有两个基本的state:Keyed state和 Operator state

Keyed State 总是与key相关,并且只能应用于 KeyedStream 的函数和 *** 作中。

你可以认为 Keyed State 是一个已经分区或者划分的,每个state分区对应一个key的 Operator State , 每个 keyed-state 逻辑上与一个<并行 *** 作实例, 键>( <parallel-operator-instance, key> )绑定在一起,由于每个key属于唯一一个键控算子( keyed operator )的并行实例,我们可以简单地看作是 <operator, key> 。

Keyed State 可以进一步的组成 Key Group , Key Group 是Flink重新分配 Keyed State 的最小单元,这里有跟定义的最大并行数一样多的 Key Group ,在运行时 keyed operator 的并行实例与key一起为一个或者多个 Key Group 工作。

使用 Operator State (或者非键控的state)的话,每个算子状态绑定到一个并行算子实例中。 Kafka Connector 就是在Flink中使用 Operator State 的一个很好的例子,每个 Kafka consumer 的并行实例保存着一个 topic 分区和偏移量的map作为它的 Operator State 。

当并行数发生变化时, Operator State 接口支持在并行 *** 作实例中进行重新分配,这里有多种方法来进行重分配。

Keyed State和 Operator State存在两种形式:托管的和原生的。

托管的State( Managed State )由Flink运行时控制的数据结构表示, 例如内部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink运行时会对State编码并将它们写入checkpoint中。

原生State( Raw State )是算子保存它们自己的数据结构的state,当checkpoint时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。

所有的数据流函数都可以使用托管state,但是原生state接口只能在实现operator时才能使用。使用托管State(而不是原生state)被推荐使用是因为使用托管state,当并行度发生变化时,Flink可以自动地重新分配state,同时还能更好地管理内存。

托管的键控state接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过streamkeyBy()创建的KeyedStream使用。

现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原形如下:

ValueState<T>:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个 *** 作的每个key可能有一个值)。这个值可以使用 update(T) 来更新,使用 T value() 来获取。

ListState<T>:这个保存了一个元素列表,你可以追加元素以及获取一个囊括当前所保存的所有元素的 Iterable ,元素可以通过调用 add(T) 来添加,而 Iterable 可以调用 Iterable<T> get() 来获取。

ReducingState<T>:这个保存了表示添加到state的所值的聚合的当个值,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的ReduceFunction聚合成一个聚合值。

FoldingState<T, ACC>:这将保存表示添加到状态的所有值的聚合的单个值,与ReducingState相反,聚合的数据类型可能跟添加到State的元素的数据类型不同,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合值。

MapState<T>:这个保存了一个映射列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射可以使用 put(UK, UV) 或者 putAll(Map<UK, UV>) 来添加。与key相关的value,可以使用 get(UK) 来获取,映射的迭代、keys及values可以分别调用 entries() , keys() 和 values() 来获取。

所有类型的state都有一个 clear() 方法来清除当前活动的key(及输入元素的key)的State。

注意: FoldingState会在下一个Flink版本中启用,并在将来的版本中彻底删除,将提供更加一般的替代方案。

值得注意的是这些State对象仅用于与State进行接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么在一次调用用户函数中获得的值可能与另一次调用的值不同。

为了获得一个State句柄,你需要创建一个 StateDescriptor ,这个 StateDescriptor 保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的值以便你能够引用它们),State保存的值的类型以及用户自定义函数如:一个 ReduceFunction 。根据你想要检索的state的类型,你可以创建一个 ValueStateDescriptor , 一个 ListStateDescriptor , 一个 ReducingStateDescriptor , 一个 FoldingStateDescriptor 或者一个 MapStateDescriptor

State可以通过 RuntimeContext 来访问,所以只能在富函数中使用。 RichFunction 中的 RuntimeContext 有以下这些方法来访问state:

ValueState<T> getState(ValueStateDescriptor<T>)

ReducingState<T> getReducingState(ReduceingStateDescriptor<T>)

ListState<T> getListState(ListStateDescriptor<T>)

FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)

MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这个 FlatMapFunction 例子展示了所有部件如何组合在一起:

这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key都是1),这个函数将计数和运行时总和保存在一个ValueState中,一旦计数大于2,就会发出平均值并清理state,因此我们又从0开始。请注意,如果我们在第一个字段中具有不同值的元组,则这将为每个不同的输入键保持不同的state值。

除了上述接口之外,Scala API还具有快捷方式在KeyedStream上通过有状态的 map() 或 flatMap() 函数获取当个ValueState, 用户定义的Function以一个Option形式来获取ValueState的当前值,并且必须返回一个更新的值来更新State。

为了使用托管的算子State,有状态的函数可以实现更加通用的CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口

CheckpointedFunction接口可以通过不同的重分区模式来访问非键控的state,它需要实现两个方法:

无论何时执行checkpoint, snapshotState() 都会被调用,相应地,每次初始化用户定义的函数时,都会调用对应的 initializeState() ,当函数首次初始化时,或者当该函数实际上是从较早的检查点进行恢复时调用的。鉴于此, initializeState() 不仅是不同类型的状态被初始化的地方,而且还是state恢复逻辑的地方。

目前列表式托管算子状态是支持的,State要求是一个可序列化的彼此独立的列表,因此可以在重新调整后重新分配,换句话说,这些对象是可重新分配的非键控state的最小粒度。根据状态的访问方法,定义了一下重分配方案:

Even-split redistribution :每个算子返回一个State元素列表,

Union redistribution :每个算子返回一个State元素列表,

Task 执行

   Spark中每个Stage中的Task会被分配到一个Worker中的 -> Executor容器里面的 -> 一个线程池中被执行,Flink称每个Executor为一个TaskManager,每个TaskManager中会有多个slot作为内存隔离:

Spark:Worker  ——>   Executor  ——>  线程池  ——>  线程

Flink:  Worker  ——>   TaskManager  ——>  Slot  ——>  线程

Slot是TaskManager资源粒度的划分,每个Slot都有自己独立的内存。所有Slot平均分配TaskManger的内存,比如TaskManager分配给Solt的内存为8G,两个Slot,每个Slot的内存为4G,四个Slot,每个Slot的内存为2G,值得注意的是,Slot仅划分内存,不涉及cpu的划分。同时Slot是Flink中的任务执行器(类似Storm中Executor),每个Slot可以运行多个task,而且一个task会以单独的线程来运行。Slot主要的好处有以下几点:

可以起到隔离内存的作用,防止多个不同job的task竞争内存。

Slot的个数就代表了一个Flink程序的最高并行度,简化了性能调优的过程

允许多个Task共享Slot,提升了资源利用率,举一个实际的例子,kafka有3个partition,对应flink的source有3个task,而keyBy我们设置的并行度为20,这个时候如果Slot不能共享的话,需要占用23个Slot,如果允许共享的话,那么只需要20个Slot即可(Slot的默认共享规则计算为20个)。

很多朋友都想知道java flink是什么?下面就一起来了解一下吧~

1、Flink是什么

Java Apache Flink是一个开源的分布式,高性能,高可用,准确的流处理框架。支持实时流处理和批处理。

2、Flink特性

(1)支持批处理和数据流程序处理

(2)优雅流畅的支持java和scala api

(3)同时支持高吞吐量和低延迟

(4)支持事件处理和无序处理通过SataStream API,基于DataFlow数据流模型

(5)在不同的时间语义(时间时间,处理时间)下支持灵活的窗口(时间,技术,会话,自定义触发器)

(6)仅处理一次的容错担保

(7)自动反压机制

(8)图处理(批) 机器学习(批) 复杂事件处理(流)

(9)在dataSet(批处理)API中内置支持迭代程序(BSP)

(10)高效的自定义内存管理,和健壮的切换能力在in-memory和out-of-core中

(11)兼容hadoop的mapreduce和storm

(12)集成YARN,HDFS,Hbase 和其它hadoop生态系统的组件

3、Flink分布式执行

Flink分布式程序包含2个主要的进程:JobManager和TaskManager当程序运行时,不同的进程就会参与其中,包括Jobmanager、TaskManager和JobClient

Flink程序提交给JobClient,JobClient再提交到JobManager,JobManager负责资源的协调和Job的执行。一旦资源分配完成,task就会分配到不同的TaskManager,TaskManager会初始化线程去执行task,并根据程序的执行状态向JobManager反馈,执行的状态包括starting、in progress、finished以及canceled和failing等。当Job执行完成,结果会返回给客户端。

1流式计算分为无状态和有状态两种情况。 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。

(1)所有类型的窗口。例如,计算过去一小时的平均水位,就是有状态的计算。

(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。

(3)流与流之间的所有关联 *** 作,以及流与静态表或动态表之间的关联 *** 作,都是有状态的计算。

2下图展示了无状态流处理和有状态流处理的主要区别。 无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。

上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。

3有状态的算子和应用程序

Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。

在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:

算子状态(operator state)

键控状态(keyed state)

4算子状态(operator state)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

Flink为算子状态提供三种基本数据结构:

列表状态(List state):将状态表示为一组数据的列表。

联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态

5键控状态(keyed state)

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

6状态后端(state backend)

每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)

状态后端主要负责两件事:

1)本地的状态管理

2)将检查点(checkpoint)状态写入远程存储

状态后端分类:

(1)MemoryStateBackend

内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。

(2)FsStateBackend

将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。

(3)RocksDBStateBackend

将所有状态序列化后,存入本地的RocksDB中存储。

7状态一致性

当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?

1)一致性级别

在流处理中,一致性可以分为3个级别:

(1) at-most-once : 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。

(2) at-least-once : 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

(3) exactly-once : 这指的是系统保证在发生故障后得到的计数结果与正确值一致。

曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二。

(1)保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。

(2)流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。

最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。

Flink的一个重大价值在于, 它既保证了 exactly-once ,也具有低延迟和高吞吐的处理能力 。

从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。

2)端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:

1)source端 —— 需要外部源可重设数据的读取位置

2)link内部 —— 依赖checkpoint

3)sink端 —— 需要保证从故障恢复时,数据不会重复写入外部系统

而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。

4)幂等写入

所谓幂等 *** 作,是说一个 *** 作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

5)事务写入

需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。

8检查点(checkpoint)

Flink具体如何保证exactly-once呢 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。

9Flink+Kafka如何实现端到端的exactly-once语义

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

1)内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性

2)source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

3)sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction

内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

我们知道Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

当checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。

每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。

每个内部的transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。

sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。

当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink *** 作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

具体的两阶段提交步骤总结如下:

1)第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”

2)触发 checkpoint *** 作,barrier从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知jobmanager

3)sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

4)jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

5)sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

6)外部kafka关闭事务,提交的数据可以正常消费了。

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。

现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供的SLA(Service-Level-Aggreement)是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理。

Flink从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的; 批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。

Flink流处理特性:

Flink以层级式系统形式组件其软件栈,不同层的栈建立在其下层基础上,并且各层接受程序不同层的抽象形式。

1 流、转换、 *** 作符

Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个 *** 作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

2 并行数据流

一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask,每一个Operator Subtask是在不同的线程中独立执行的。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度总是等于生成它的Operator的并行度。

One-to-one模式

比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。

Redistribution模式

这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。

3任务、 *** 作符链

Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

4 时间

处理Stream中的记录时,记录中通常会包含各种典型的时间字段:

Event Time:表示事件创建时间

Ingestion Time:表示事件进入到Flink Dataflow的时间

Processing Time:表示某个Operator对事件进行处理的本地系统时间

Flink使用WaterMark衡量时间的时间,WaterMark携带时间戳t,并被插入到stream中。

5 窗口

Flink支持基于时间窗口 *** 作,也支持基于数据的窗口 *** 作:

窗口分类:

Tumbling/Sliding Time Window

// Stream of (sensorId, carCnt)

val vehicleCnts: DataStream[(Int, Int)] =

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts

// key stream by sensorId

keyBy(0)

// tumbling time window of 1 minute length

timeWindow(Timeminutes(1))

// compute sum over carCnt

sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts

keyBy(0)

// sliding time window of 1 minute length and 30 secs trigger interval

timeWindow(Timeminutes(1), Timeseconds(30))

sum(1)

Tumbling/Sliding Count Window

// Stream of (sensorId, carCnt)

val vehicleCnts: DataStream[(Int, Int)] =

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts

// key stream by sensorId

keyBy(0)

// tumbling count window of 100 elements size

countWindow(100)

// compute the carCnt sum

sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts

keyBy(0)

// sliding count window of 100 elements size and 10 elements trigger interval

countWindow(100, 10)

sum(1)

自定义窗口

基本 *** 作:

6 容错

Barrier机制:

对齐:

当Operator接收到多个输入的数据流时,需要在Snapshot Barrier中对数据流进行排列对齐:

基于Stream Aligning *** 作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。

CheckPoint:

Snapshot并不仅仅是对数据流做了一个状态的Checkpoint,它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。状态包含两种:

7 调度

在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,ExecutionGraph是JobGraph的并行表示,也就是实际JobManager调度一个Job在TaskManager上运行的逻辑视图。

物理上进行调度,基于资源的分配与使用的一个例子:

8 迭代

机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型。

Iterate

Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果。

流程伪代码:

IterationState state = getInitialState();

while (!terminationCriterion()) {

state = step(state);

}

setFinalState(state);

Delta Iterate

Delta Iterate Operator实现了增量迭代。

流程伪代码:

IterationState workset = getInitialState();

IterationState solution = getInitialSolution();

while (!terminationCriterion()) {

(delta, workset) = step(workset, solution);

solutionupdate(delta)

}

setFinalState(solution);

最小值传播:

9 Back Pressure监控

流处理系统中,当下游Operator处理速度跟不上的情况,如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。

Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现。

默认情况下,JobManager会每间隔50ms触发对一个Job的每个Task依次进行100次堆栈跟踪调用,过计算得到一个比值,例如,radio=001,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态:

OK: 0 <= Ratio <= 010

LOW: 010 < Ratio <= 05

HIGH: 05 < Ratio <= 1

1 Table

Flink的Table API实现了使用类SQL进行流和批处理。

详情参考:>

Flink作为新的stream计算引擎,这两年社区的活跃度很高。对于Flink 既可以处理stream data也可以处理batch data,同时可以兼顾Spark以及Sparkstreaming的功能,与Spark不同的是,Flink本质上只有stream的概念,batch被认为是special stream。Flink主要有以下几个角色需要大家了解,对于Flink的开发是很有帮助的。也便于自己后期翻阅。

JobClient:

负责接收程序,解析和优化程序的执行计划,然后提交执行计划到JobManager。这里执行的程序优化是将相邻的Operator融合,形成OperatorChain,Operator的融合可以减少task的数量,提高TaskManager的资源利用率。

JobManagers:

负责申请资源,协调以及控制整个job的执行过程,具体包括,调度任务、处理checkpoint、容错等等

TaskManager:

TaskManager运行在不同节点上的JVM进程(process),负责接收并执行JobManager发送的task,并且与JobManager通信,反馈任务状态信息,如果说JobManager是master的话,那么TaskManager就是worker用于执行任务。每个TaskManager像是一个容器

,包含一个或者多个Slot。

Slot:

Slot是TaskManager资源粒度的划分,每个Slot都有自己独立的内存。所有Slot平均分配TaskManager的内存,值得注意的是,Slot仅划分内存,不涉及cpu的划分。每个Slot可以运行多个task。Slot的个数就代表了一个程序的最高并行度。

Task:

Task是在operators的subtask进行链化之后形成的,具体Flink job中有多少task和operator的并行度和链化的策略有关,为了方便大家理解,可以参考图5中所示的理解。

SubTask:

因为Flink是分布式部署的,程序中的每个算子,在实际执行中被分隔为一个或者多个subtask,运算符子任务(subtask)的数量是该特定运算符的并行度。数据流在算子之间流动,就对应到SubTask之间的数据传输。Flink允许同一个job中来自不同task的subtask可以共享同一个slot。每个slot可以执行一个并行的pipeline。可以将pipeline看作是多个subtask的组成的。

Flink程序本质上是并行和分布式的。在程序执行期间,一个流会生成一个或者多个stream partition,并且一个operator会生成一个或者多个operator subtask。operator的 subtask 彼此之间是独立的,分别在不同的线程里去执行并且可能分布在不同的机器上或者containers上。

operator的subtasks的数量等于该 *** 作算子的并行度的数量。流的并行度有总是取决于产生它的 *** 作算子的并行度决定的。同一个flink程序中的不同的operators可能有不同的并行度。

数据流在两个operators之间进行传递的方式有两种:one-to-one 模式 和 redistributing 模式

①:one-to-one 模式:两个operator用此模式传递的时候,会保持数据的分区数和数据的排序,比如:在下图中Source和map() operators之间的数据传递方式;

②:Redistributing 模式:这种模式会改变数据的分区数;每个一个operator subtask会根据选择transformation把数据发送到不同的目标subtasks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区,比如:在下图中map()和keyBy/window ,keyBy/window和Sink之间的数据传递方式;

对于分布式计算,Flink将operator 的subtasks链化在一起形成tasks。每个task在一个线程中被执行。将operators链化在一起形成tasks是比较好的一个优化:他减少了线程和线程之间的切换和缓冲的开销,增加了吞吐量降低了延迟。对于operator的链化行为,可以根据个人来去调整。详情参考 官网

下图中operators经过链化之后,仅仅需要5个并行的线程。

①每一个worker(TaskManager) 都是一个JVM进程,他可能会在独立的线程中执行一个或者多个subtask。为了控制worker能够接收多个task。worker通过task slot来进行控制(一个worker至少有一个task slot)。

②每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。slot的资源化意味着一个job的subtask将不需要跟来自其它job的subtask竞争被管理的内存。

③通过调整task slots的数量,用户可以定义subtasks它们之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group独立的运行在JVM中。而一个TaskManager多个slot意味着更多的subtask可以共享一个JVM。而在同一个JVM进程中的task将共享TCP连接和心跳消息。它们也可能共享数据集和数据结构,这样可以减少每个task的负载。

默认,如果subtask是来自相同的job,但不是相同的task,Flink允许subtask共享slot。这样就会出现一个slot可能容纳一个job中的整个pipeline。允许slot共享有以下两个好处:

① Flink集群需要的task slots的数量和作业中的最高并行度的一致。不需要计算一个程序总共包含多少个task。

②更好的利用资源。如果没有slot共享,非密集型source/map()子任务将阻塞与资源密集型窗口子任务一样多的资源;在slot共享的话,将我们图6的示例中的基本并行度从2提高到6,可以充分利用slot资源,同时确保繁重的subtasks在Taskmanager中公平分配。

Flink具体如何保证exactly-once呢 它使用一种被称为"检查点"( checkpoint )的特性,在出现 故障时将系统重置回正确状态 。下面通过简单的类比来解释检查点的作用。

假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。

于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。

Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。

checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性

每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator ,CheckpointCoordinator全权负责本应用的快照制作。

流的barrier是Flink的Checkpoint中的一个核心概念 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动 (有点类似于Watermark) 这些barrier不会跨越流中的数据

每个barrier会把数据流分成两部分: 一部分数据进入 当前的快照 , 另一部分数据进入 下一个快照 每个barrier携带着快照的id barrier 不会暂停数据 的流动, 所以非常轻量级 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照

Job Manager 对每一个job都会产生一个 Checkpoint Coordinator

向所有 source 节点 触发 trigger Checkpoint 节点, 并行度是几,就会触发多少个。

source 会向流中触发 Barrier ,接收到 Barrier 的节点就会保存快照(包括source)。

表示两秒钟 source 向流中触发一次 Barrier

source先收到 barrier ,然后往后传递,若是多并行度,相当于多组接力赛跑比赛,所以顺序是不一致的,并不是同步。

在多并行度下, 如果要实现 严格一次 , 则要执行 barrier对齐

当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

会 重复消费 , 就是至少一次语义

以上就是关于Flink | Checkpoint 机制详解全部的内容,包括:Flink | Checkpoint 机制详解、Flink 有状态的流的工作(Working with state)、Flink工作原理等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zz/9767512.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-01
下一篇2023-05-01

发表评论

登录后才能评论

评论列表(0条)

    保存