Flink on YARN时,如何确定TaskManager数

Flink on YARN时,如何确定TaskManager数,第1张

答案写在最前面:Job的最大并行度除以每个TaskManager分配的任务槽数。

在 Flink 15 Release Notes 中,有这样一段话,直接上截图。

这说明从15版本开始,Flink on YARN时的容器数量——亦即TaskManager数量——将由程序的并行度自动推算,也就是说flink run脚本的-yn/--yarncontainer参数不起作用了。那么自动推算的规则是什么呢?要弄清楚它,先来复习Flink的并行度(Parallelism)和任务槽(Task Slot)。

与Spark类似地,一个Flink Job在生成执行计划时也划分成多个Task。Task可以是Source、Sink、算子或算子链(算子链有点意思,之后会另写文章详细说的)。Task可以由多线程并发执行,每个线程处理Task输入数据的一个子集。而并发的数量就称为Parallelism,即并行度。

Flink程序中设定并行度有4种级别,从低到高分别为:算子级别、执行环境(ExecutionEnvironment)级别、客户端(命令行)级别、配置文件(flink-confyaml)级别。实际执行时,优先级则是反过来的,算子级别最高。简单示例如下。

Flink运行时由两个组件组成:JobManager与TaskManager,与Spark Standalone模式下的Master与Worker是同等概念。从官网抄来的图如下所示,很容易理解。

JobManager和TaskManager本质上都是JVM进程。为了提高Flink程序的运行效率和资源利用率,Flink在TaskManager中实现了任务槽(Task Slot)。任务槽是Flink计算资源的基本单位,每个任务槽可以在同一时间执行一个Task,而TaskManager可以拥有一个或者多个任务槽。

任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanagerheapsize的N分之一大小的内存。CPU资源不算在内。

TaskManager的任务槽个数在使用flink run脚本提交on YARN作业时用-ys/--yarnslots参数来指定,另外在flink-confyaml文件中也有默认值taskManagernumberOfTaskSlots。一般来讲,我们设定该参数时可以将它理解成一个TaskManager可以利用的CPU核心数,因此也要根据实际情况(集群的CPU资源和作业的计算量)来确定。

以Flink自带示例中简化的WordCount程序为例:

用--yarnslots 3参数来执行,即每个TaskManager分配3个任务槽。TaskManager、任务槽和任务的分布将如下图所示,方括号内的数字为并行线程的编号。

由图中可以看出,由于算子链机制的存在,KeyAgg与Sink *** 作链接在了一起,作为一个Task来执行。

Flink允许任务槽共享,即来自同一个Job的不同Task的Sub-Task(理解为Task的子集就行)进入同一个槽位,因此在图中也可以见到任务槽X中同时存在FlatMap[X]与KeyAgg[X]+Sink[X]。任务槽共享有两点好处:

所以,可以得出Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为20,每个TaskManager有两个任务槽的作业,就会启动10个TaskManager,如Web UI所示。

参考: >

对于流计算来说,最核心的概念就是无穷数据集,而用来处理无穷数据集的计算就可以称为流计算。面对无穷数据集,有多种多样的处理方式,但是大致上可以分为四类:

1、时间无关:最基础的场景就是Filter,我们只关心我们想要的数据,这跟数据源是否是无穷的、失序都没有关系了。

2、近似算法:比如近似Top-N、流K-means聚类等。他们都以无穷数据为输入,并计算出差不多你想要的结果。

3、窗口:而窗口其实就是对无穷数据集进行分片,一种化无穷为有穷的抽象概念。

显然,无穷数据集有N多分片的方式,因此也就对应着N多的窗口。而其中最为引人注目的就是按时间划分的窗口了,是的,没有比时间窗口更有吸引力的划分方式了。而在时间窗口中,核心的一个概念就是时间,在流计算中一般可以分为处理时间和事件时间,当然还可以定义更多时间的概念,这完全看你自己喽。在Flink中就有这么一个东西:摄入时间。在这里,想说的是,只有事件时间才能保证正确性,程序进行回放也能保证一致性。

对于无穷数据集,我们缺乏一种有效的方式来判断数据完整性,因此就有了WaterMark。watermark是建立在事件时间上的一个概念,用来刻画数据流的完整性。如果按照处理时间来衡量事件,一切都是有序的,完美的,自然而然也就无需watermark了。

换句话说事件时间引入了乱序的问题,而watermark就是用来解决乱序问题。所谓的乱序,其实就是有事件迟到了,对于迟到的元素,我们不可能无限期的等下去,必须要有一种机制来保证一个特定的时间后,必须触发window进行计算了。这个特别的机制,就是watermark,它告诉了算子时间不大于 也就是小于等于 WaterMark 的消息不应该再被接收(如果出现意味着延迟到达)。

备注:后边有句话,当watermark时间 >= window maxTimestamp时,就符合了window触发的条件了,可以帮助理解这句话

在Flink中,window是按照自然时间进行划分的,如果window大小是3秒,那么1分钟内会把window划分为如下的形式:

如果window大小是10秒,则window会被分为如下的形式:当然还有一个offset值可以控制window的起始值不是整点。

到 EventTimeTrigger 的 onElement 中看看: EventTimeTrigger 中当 ctxgetCurrentWatermark >= windowmaxTimestamp 时立刻触发窗口计算。

windowmaxTimestamp = 窗口结束时间 - 1,flink时间窗口的单位为ms,也就是时间戳,也就是说就差一毫秒,也不会触发窗口。

然后到调用Evictor的地方看看:没有内容是不会触发计算的

输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当 watermark 时间 >= window maxTimestamp 时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window maxTimestamp决定。

waterMark,checkpoint其实都是上游节点广播消息给下游节点来处理的行为(都是在流中插入一种特殊的数据结构来做处理)

即按照固定的时间间隔周期的生成水位线。这个时间间隔可以通过 ExecutionConfigsetAutoWatermarkInterval() 进行设置。当然只有新生成的水位线不为空并且大于上一次生成的水位线,新水位线才会被发出。

生成新的水位线的逻辑完全是由用户自己定义的。最简单的水位线生成算法就是取目前为止最大的事件时间。当然这种算法比较暴力,容易水位线提升突涨(这个最大时间戳可能过大),因此该算法对乱序事件的容忍程度比较低,容易出现大量迟到事件。当然我们用的最多的是KeyedWindow,一个Window往往有多个输入,而Window算子会选择其中最小的一个。

通过数据流中某些特殊标记事件来触发新水位线的生成。

虽然水位线指示着早于它的事件不应该再出现,但是在实际情况中,水位线生成算法,往往生不成完美水位线,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

1、重新激活已经关闭的窗口并重新计算以修正结果。

2、将迟到事件收集起来另外处理。

3、将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。

Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

窗口是flink处理无限流的核心,窗口将流拆分为有限大小的“桶”,我们可以在这些桶上进行计算。

根据上游数据是否为Keyed Stream类型(是否将数据按照某个指定的Key进行分区),将窗口划分为Keyed Window和Non-Keyed Windows。两者的区别在于KeyStream调用相应的window()方法来指定window类型,数据会根据Key在不同的Task中并行计算,而Non-Keyed Stream需要调用WindowsAll()方法来指定window类型,所有的数据都会在一个Task进行计算,相当于没有并行。

窗口分配器负责将一个事件分配给一个或多个窗口,内置窗口包括: 滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)、会话窗口(Session Windows)、全局窗口(Global Windows),也可以通过继承WindowAssigner类来自定义窗口。

Flink中所有的内置窗口(全局窗口除外)都有基于时间的实现,这个时间可以是事件时间(event time),也可以是处理时间(processing time)。其中,处理滚动窗口和滑动窗口的算子,在112版本之前使用timeWindow(),在112版本被标记为 废弃 ,转而使用window()来作为窗口处理算子,这里只介绍最新版本的使用算子。

由于Flink默认使用的时间基准是UTC±00:00时间,在中国需要使用UTC+08:00时间,所以最后一个示例中窗口大小为1天,时间偏移量就是8小时。

最后一个示例,中的Timehours(-8)含义与滚动窗口一致。从滑动窗口的使用来看,滚动窗口其实是滑动窗口的一个特例,但窗口大小和滑动间隔相等的时候,滑动窗口就是一个滚动窗口。

动态的会话gap需要实现SessionWindowTimeGapExtractor接口。

基于计数的窗口是根据事件的个数来对窗口进行划分的,概念跟基于时间的滚动窗口差不多,只不过窗口大小的划分,有时间变成了事件的个数。

全局窗口分配器将所有具有相同key的元素分配到同一个全局窗口中,这个窗口模式仅适用于用户还需自定义触发器的情况。否则,由于全局窗口没有一个自然的结尾,无法执行元素的聚合,将不会有计算被执行。

使用示例如下:

触发器决定了一个窗口何时可以被窗口函数处理,每一个窗口分配器都有一个默认的触发器,如果默认的触发器不能满足你的需要,你可以通过调用trigger()来指定一个自定义的触发器。触发器的接口有5个方法来允许触发器处理不同的事件:

Flink有一些内置的触发器:

GlobalWindow默认的触发器是NeverTrigger,是永远不会触发的,因此,如果你使用的是GlobalWindow的话,需要定义一个自定义触发器。

Flink的窗口模型允许指定一个除了WindowAssigner和Trigger之外的可选参数Evitor,这个可以通过调用evitor()方法来实现。这个驱逐器(evitor)可以在触发器触发之前或者之后清理窗口中的元素。为了达到这个目的,Evitor接口有两个方法:

注:指定一个Evitor要防止预聚合,因为窗口中的所有元素必须得在计算之前传递到驱逐器中

Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。

可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 getRuntionContext()getMetricGroup() 方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。

Counter 用来计数。当前值可以使用 inc() / inc(long n) 或 dec() / dec(long n) 进行增减。

Gauge 根据需要提供任何类型的值。需要先创建一个实现 orgapacheflinkmetricsGauge 的类,返回值的类形没有限制。

Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 toString() 实现。

Histogram 统计值的分布。

Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现

Meter 用来统计平均吞吐量。

同样添加 flink-metrics-dropwizard 依赖,可以使用 DropwizardMeterWrapper 实现

每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。

标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 AB 是系统 Scope,CD 是用户 Scope,E 是名称,那么标识符将是 ABCDE。

可以通过在 conf/flink-confyaml 中设置 metricsscopedelimiter 键来配置用于标识符的分隔符(默认值:)。

定义 User Scope 的方法: 调用 MetricGroup#addGroup(String name) , MetricGroup#addGroup(int name) , MetricGroup#addGroup(String key, String value) 。这些方法会影响 MetricGroup#getMetricIdentifier 和 MetricGroup#getScopeComponents 的返回值。

System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task(<task_name>)或属于哪个 Job(<job_name>)。

应该包含哪些上下文信息可以通过 conf/flink-confyaml 配置。

<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作为变量使用。变量的数量或顺序没有限制,区分大小写。

例如:Operator Metric 的默认 Scope 格式为 <host>taskmanager<tm_id><job_name><operator_name><subtask_index> ,生成的标识符类似 localhosttaskmanager1234MyJobMyOperator0MyMetric 的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 metricsscopeoperator: <host><job_name><task_name><operator_name><subtask_index> ,生成的标识符会变成 localhostMyJobMySource_->_MyOperatorMyOperator0MyMetric 。

建议添加带有 ID 的变量(如:<job_id>)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:

Flink 允许向外部系统报告 Metric。

通过在 conf/flink-confyaml 中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。

Reporter 必须至少配置 class 或 factoryclass 属性(使用哪个取决于 Reporter 的实现)。

配置 Reporter 示例

自定义 Reporter:

下面列出了一些支持的 Reporter

orgapacheflinkmetricsjmxJMXReporter

参数:

通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 orgapacheflink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:orgapacheflinkjobtasknumBytesOut。

key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如: host=localhost,job_name=MyJob,task_name=MyTask 。

orgapacheflinkmetricsprometheusPrometheusReporter

参数:

Flink Metric 类型和 Prometheus Metric 类型映射

orgapacheflinkmetricsprometheusPrometheusPushGatewayReporter

参数

PrometheusPushGatewayReporter 将 Metric 推到 Pushgateway

默认情况下,Flink 收集的指标

代替 Network/IO 部分 Metrics

如果启用了 Reactive Mode (113 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。

如果启用了 Reactive Mode (113 MVP 特性),Job Scope 的 Metric 不能正常工作。

Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置( conf/flink-confyaml )或 ExecutionConfig 中将 latencyTrackingInterval 设置为正数。

Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。

延迟监控的粒度,分为以下3档:

需要注意:

Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。

Base URL: >

java所有数据类型对应的字节大小

java对象的组成 : 对象头,实例数据,对齐部分

jvm 序列化缺点

上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网

heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题

内存参数配置

在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr )

同时MemorySegment也提供了对二进制数据的 *** 作方法,以及读取字节数组序列化以及序列化字节数组的方法等

下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现

MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存 *** 作

DataInputView

DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page

InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同

方法图

DataOutputView

与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能

方法图

类继承图

用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffe r<实际写入的是MemorySegment> 写数据

BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer

常用参数介绍

buffer申请

buffer回收

当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用

flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存

NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请

LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer

在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据

BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer

实际上提供的buffer是

以上就是关于Flink on YARN时,如何确定TaskManager数全部的内容,包括:Flink on YARN时,如何确定TaskManager数、Flink并行度可以有如下几种指定方式、Flink之WaterMarker详解等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zz/9751553.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-01
下一篇2023-05-01

发表评论

登录后才能评论

评论列表(0条)

    保存