Spark 并行度

Spark 并行度,第1张

数据学习交流微信群

Spark 并行度 是指一个 stage 下并行执行的 task 数量,由于一个 task 线程只能执行一个 rdd partition 分区 ,因此,Spark并行度与Spark作业执行性能息息相关。

假定给Spark 作业分配了足够的资源,比如有 50 个 executor ,每个 executor 有 3 个 cpu core ,这也意味最多可以有 503=150 个线程同时执行。

如果没有设置并行度,或者设置过小,比如并行度(或者说分区数)为 10 ,那么只会有 10个 线程来并行执行任务,剩余 140 个线程处于空闲状态,不仅造成严重的资源浪费,并且大大降低了作业性能。

换句话说,你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。

合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源。比如上面的例子,如果调整并行度为 150 ,则是最理想状态,每个线程都得以被利用,每个分区的数据量也大大减少,性能会得到明显提升。但是官方推荐的并行度是 executor cpu core 的 2-3倍 。这是因为,考虑到有些任务执行时间短暂,线程释放之后可以马上复用,运行下一个任务。当然,如果任务运行时间长,没有空闲线程,多余的分区必须要等待了。

指定并行度(分区数)的方法参见 Java Spark 简单示例(七) RDD分区 分区划分器

众所周知,GPU作为通用的加速硬件,被越来越广泛的用于图形图像处理、深度学习、高性能计算等领域, 效果显著。

ML/DL领域,Tensorflow, PyTorch 等深度学习框架大行其道,而 Spark 提供的 GraphX 和 MlLib 可以做一些机器学习的东西,但是在深度学习的战场里没有什么优势,最大的问题就在于硬件加速上,30 以前的社区版 Spark 没有调度 GPU 的方法。

训练模型除了本身的大规模的并行密集计算,从数据到模型,必须有数据处理的过程,这也是 Spark 的强项,因为你不太可能用 pandas 简单清洗汇总你的训练数据,而且做AI业务的企业往往已经部署有一套大数据平台环境。

Accelerator-aware task scheduling for Spark:SPARK-24615

Design sketch:SPARK-27005

2018年,Hadoop31 YARN已经支持GPU调度。

Apache Spark支持的资源管理器 YARN 和 Kubernetes 已经支持了 GPU。为了让 Spark 也支持 GPU,在技术层面上需要做出两个主要改变:

2020年6月,Spark300发布开始支持GPU调度。

在这个层面,我们得允许从 RDD/PandasUDF API 中指定资源请求,这些请求应该在 DAGScheduler 中汇总。TaskSetManager 管理每个 Stage 挂起(pending)的任务,对于那些有 GPU 请求的任务,我们需要处理;对于那些不需要 GPU 的作业,其调度行为和效率应该和之前保持一致。

目前,CPUS_PER_TASK(sparktaskcpus)是一个 int 类型的全局配置,用于指定每个 task 应分配的 cores。为了支持 GPU 的配置,引入了 sparktaskgpus 参数用于指定每个 task 需要申请的 GPU 数。如果用户没有指定 sparktaskcpus 或 sparktaskgpus,那么 Spark 程序将使用默认的值;因为需要向后兼容,所以如果用户没指定 sparktaskcpus 或 sparktaskgpus ,这两个参数的默认值分别为 1 和 空。

对于 ExecutorBackend ,需要使得它可以识别和管理 GPU ,并且把这些信息同步(比如修改现有的 RegisterExecutor 类)到 SchedulerBackend,然后 SchedulerBackend 可以根据这些 GPU 信息,为那些需要 GPU 资源的 task 进行资源分配。

第一阶段将在 Standalone、YARN 以及 Kubernetes 上支持 GPU。Spark 需要在这三种资源管理上面做一些工作。

Standalone 是 Spark 内置的资源管理模式,但是之前的 Standalone 部署模式并不能支持 GPU 等资源。为了能识别 GPU 信息,一种可行的方法是在配置文件里面对 GPU 资源进行配置, Worker 通过读取这些配置信息,并在内存结构里面维护 GPU 和 CPU 等可用资源等信息。同时,在 Master 上通过 allocateWorkerResourceToExecutors 方法对 Executors 申请的资源(包括 GPU)进行分配。

为了能够在 YARN 上支持 GPU,我们需要使用 YARN 312+ 版本;同时我们需要在 YARN 集群上做出相关配置,使得 YARN 启动了对 GPU 资源的支持,关于如何在 YARN 上配置 GPU 资源,请参见 这里 。

当为 Executors 申请 YARN 容器时,Spark 需要在 YARN 容器请求中将 executor 所需的 GPU 数量映射到 yarnio/gpu 资源中。YARN 具有 GPU 隔离机制,所以无论是否使用 Docker 容器, 对未分配给 YARN 容器的 GPU 资源的使用将会被阻止。

需要注意的是,截至目前 YARN 仅支持 Nvidia GPU

从 Kubernetes 18 版本开始,Kubernetes 使用设备插件模型(device plugin model)来支持 GPU、高性能NIC,FPGA 等设备。目前 Kubernetes 支持 Nvidia 、AMD 和 Intel 的 GPU 设备 。在 Spark + k8s 里面为 task 指定 GPU 的数量和在 Standalone 或 YARN 模式里面一样。也是支持 sparktaskgpus 和 sparkexecutorgpus 的全局配置,也支持在 RDD stage 中为每个 task 设置。

GPU 和其他加速已广泛用于加速特殊工作负载,例如深度学习和信号处理。Spark 现在支持请求和调度通用资源,例如 GPU。

Spark 将使用指定的配置首先从集群管理器请求具有相应资源的容器。一旦获得容器,Spark 就会在该容器中启动一个 Executor,它将发现容器拥有哪些资源以及与每个资源关联的地址。Executor 将向 Driver 注册并报告该 Executor 可用的资源。然后 Spark 调度器可以将任务调度到每个 Executor 并根据用户指定的资源需求分配特定的资源地址。

注意: 它目前不适用于 Mesos 或本地模式。

阶段级别调度功能(SPARK-27495)允许用户在阶段级别指定任务和执行器资源需求。这允许不同阶段与具有不同资源的执行程序一起运行。一个典型的例子是一个 ETL 阶段,执行器只使用 CPU,下一个阶段是需要 GPU 的 ML 阶段。阶段级调度允许用户在 ML 阶段运行时请求具有 GPU 的不同执行器,而不必在应用程序开始时获取具有 GPU 的执行器,并且在 ETL 阶段运行时它们处于空闲状态。

这仅适用于 Scala、Java 和 Python 中的 RDD API 。当启用动态分配时,它在 YARN 和 Kubernetes 上可用。

该功能在Spark300版本还没有实现,在Spark31版本才有。

YARN 310 中添加了 YARN 上的GPU资源调度。理想情况下,资源是隔离设置的,以便执行器只能看到分配给它的资源。如果您没有启用隔离,则用户负责创建一个发现脚本,以确保资源不会在执行程序之间共享。

YARN 目前支持任何用户定义的资源类型,但内置了 GPU ( yarnio/gpu ) 和 FPGA ( yarnio/fpga ) 类型。因此,如果您正在使用这些资源中的任何一个,Spark 可以将对 spark 资源的请求转换为 YARN 资源。

YARN 不会告诉 Spark 分配给每个容器的资源地址。出于这个原因,用户必须指定一个发现脚本,该脚本在启动时由executor运行,以发现该executor可用的资源。可以在 examples/src/main/scripts/getGpusResourcessh 中找到示例脚本。

启用动态分配(dynamic allocation)时,YARN 支持 阶段级调度

一个简单的资源spark-shell GPU配置启动:

对于这种配置,我们期望在 Executor 上一次运行不超过两个任务,因为每个任务将使用一个 GPU,并且每个 Executor 最多有两个 GPU。

Spark 2时代,正如我之前提到的,传统上在与机器学习相关的管道中,数据准备,在一个 CPU 集群上,由 Spark 编排,这必然会被序列化到共享存储,然后是一个单独的GPU集群将实际加载序列化数据备份并使用它进行训练,如 Tensorflow。

有了 Spark 3,我们终于有了一个统一的架构。我们有一个单一的管道,具备阶段级调度,我们实际上可以将它作为一个应用程序进行调度,我们可以进行数据摄取、准备和模型训练,所有这些都由 Spark 编排,单一平台,为 AI 而生。

计算负载主要由 Executors 承担,Driver 主要负责分布式调度,调优空间有限,因此对 Driver 端的配置项我们不作考虑

通过如下参数就可以明确有多少 CPU 资源被划拨给 Spark 用于分布式计算。

sparkcoresmax 集群

sparkexecutorcores Executor

sparktaskcpus 计算任务

并行度

sparkdefaultparallelism 并行度

sparksqlshufflepartitions 用于明确指定数据关联或聚合 *** 作中 Reduce 端的分区数量。

在平衡 Execution memory 与 Storage memory 的时候,如果 RDD 缓存是刚需,我们就把 sparkmemorystorageFraction 调大,并且在应用中优先把缓存灌满,再把计算逻辑应用在缓存数据之上。除此之外,我们还可以同时调整 sparkrddcompress 和 sparkmemorystorageFraction 来缓和 Full GC 的冲击

sparklocaldir 这个配置项,这个参数允许开发者设置磁盘目录,该目录用于存储 RDD cache 落盘数据块和 Shuffle 中间文件。如果你的经费比较充裕,有条件在计算节点中配备足量的 SSD 存储,甚至是更多的内存资源,完全可以把 SSD 上的文件系统目录,或是内存文件系统添加到 sparklocaldir 配置项中去,从而提供更好的 I/O 性能。

Configuration - Spark 321

自 16 版本之后,Spark 统一采用 Sort shuffle manager 来管理 Shuffle *** 作,在 Sort shuffle manager 的管理机制下,无论计算结果本身是否需要排序,Shuffle 计算过程在 Map 阶段和 Reduce 阶段都会引入排序 *** 作。

在不需要聚合,也不需要排序的计算场景中,我们就可以通过设置 sparkshufflesortbypassMergeThreshold 的参数,来改变 Reduce 端的并行度(默认值是 200)。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序。

Spark SQL 下面的配置项还是蛮多的,其中对执行性能贡献最大的,当属 AQE(Adaptive query execution,自适应查询引擎)引入的那 3 个特性了,也就是自动分区合并、自动数据倾斜处理和 Join 策略调整。

AQE 事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次,那么,“目标尺寸”由什么来决定的呢?Spark 提供了两个配置项来共同决定分区合并的“目标尺寸”,分区合并的目标尺寸取 advisoryPartitionSizeInBytes 与 partitionSize (每个分区的平均大小)之间的最小值。

我们来举个例子。假设,Shuffle 过后数据大小为 20GB,minPartitionNum 设置为 200,反推过来,每个分区的尺寸就是 20GB / 200 = 100MB。再假设,advisoryPartitionSizeInBytes 设置为 200MB,最终的目标分区尺寸就是取(100MB,200MB)之间的最小值,也就是 100MB。因此你看,并不是你指定了 advisoryPartitionSizeInBytes 是多少

首先,分区尺寸必须要大于 sparksqladaptiveskewJoinskewedPartitionThresholdInBytes 参数的设定值,才有可能被判定为倾斜分区。然后,AQE 统计所有数据分区大小并排序,取中位数作为放大基数,尺寸大于中位数一定倍数的分区会被判定为倾斜分区,中位数的放大倍数也是由参数 sparksqladaptiveskewJoinskewedPartitionFactor(默认值是 5 倍) 控制。

实际上指的是,把会引入 Shuffle 的 Join 方式,如 Hash Join、Sort Merge Join,“降级”(Demote)为 Broadcast Join。

在 Spark 发布 AQE 之前,开发者可以利用 sparksqlautoBroadcastJoinThreshold 配置项对数据关联 *** 作进行主动降级。这个参数的默认值是 10MB,参与 Join 的两张表中只要有一张数据表的尺寸小于 10MB

不过,autoBroadcastJoinThreshold 这个参数虽然好用,但是有两个让人头疼的短板。一是可靠性较差。尽管开发者明确设置了广播阈值,而且小表数据量在阈值以内,但 Spark 对小表尺寸的误判时有发生,导致 Broadcast Join 降级失败。二来,预先设置广播阈值是一种静态的优化机制,它没有办法在运行时动态对数据关联进行降级调整。

AQE 很好地解决了这两个头疼的问题。首先,AQE 的 Join 策略调整是一种动态优化机制,对于刚才的两张大表,AQE 会在数据表完成过滤 *** 作之后动态计算剩余数据量,当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join。再者,运行时的数据量估算要比编译时准确得多,因此 AQE 的动态 Join 策略调整相比静态优化会更可靠、更稳定。

每个 Map Task 生成的数据文件,都包含所有 Reduce Task 所需的部分数据。因此,任何一个 Reduce Task 要想完成计算,必须先从所有 Map Task 的中间文件里去拉取属于自己的那部分数据。索引文件正是用于帮助判定哪部分数据属于哪个 Reduce Task。Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程。

显然,Shuffle 中数据分发的网络开销,会随着 Map Task 与 Reduce Task 的线性增长,呈指数级爆炸。

Shuffle Joins

第一步就是对参与关联的左右表分别进行 Shuffle,Shuffle 的分区规则是先对 Join keys 计算哈希值,再把哈希值对分区数取模。Shuffle 完成之后,第二步就是在同一个 Executors 内,Reduce task 就可以对 userID 一致的记录进行关联 *** 作。

Broadcast Join

使用广播阈值配置项让 Spark 优先选择 Broadcast Joins 的关键,就是要确保至少有一张表的存储尺寸小于广播阈值(数据表在磁盘上的存储大小,同一份数据在内存中的存储大小往往会比磁盘中的存储大小膨胀数倍)

Spark 将内存分成了 Execution Memory 和 Storage Memory 两类,分别用于分布式任务执行和 RDD 缓存。其中,RDD 缓存虽然最终占用的是 Storage Memory,但在 RDD 展开(Unroll)之前,计算任务消耗的还是 Execution Memory。因此,Spark 中 CPU 与内存的平衡,其实就是 CPU 与执行内存之间的协同与配比。

并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。并行度可以通过两个参数来设置,分别是 sparkdefaultparallelism 和 sparksqlshufflepartitions。前者用于设置 RDD 的默认并行度,后者在 Spark SQL 开发框架下,指定了 Shuffle Reduce 阶段默认的并行度。并发度呢?Executor 的线程池大小由参数 sparkexecutorcores 决定,每个任务在执行期间需要消耗的线程数由 sparktaskcpus 配置项给定。两者相除得到的商就是并发度,也就是同一时间内,一个 Executor 内部可以同时运行的最大任务数量。又因为,sparktaskcpus 默认数值为 1,并且通常不需要调整,所以,并发度基本由 sparkexecutorcores 参数敲定。就 Executor 的线程池来说,尽管线程本身可以复用,但每个线程在同一时间只能计算一个任务,每个任务负责处理一个数据分片。因此,在运行时,线程、任务与分区是一一对应的关系。

对于 User Memory 内存区域来说,使用 空间去重复存储同样的数据,本身就是降低了内存的利用率

对于存储级别来说,实际开发中最常用到的有两个,MEMORY_ONLY 和 MEMORY_AND_DISK,它们分别是 RDD 缓存和 DataFrame 缓存的默认存储级别。对于缓存计算来说,它分为 3 个步骤,第一步是 Unroll,把 RDD 数据分片的 Iterator 物化为对象值,第二步是 Transfer,把对象值封装为 MemoryEntry,第三步是把 BlockId、MemoryEntry 价值对注册到 LinkedHashMap 数据结构。另外,当数据缓存需求远大于 Storage Memory 区域的空间供给时,Spark 利用 LinkedHashMap 数据结构提供的特性,会遵循 LRU 和兔子不吃窝边草这两个基本原则来清除内存空间:LRU:按照元素的访问顺序,优先清除那些“最近最少访问”的 BlockId、MemoryEntry 键值对兔子不吃窝边草:在清除的过程中,同属一个 RDD 的 MemoryEntry 拥有“赦免权”

PROCESS_LOCAL:任务与数据同在一个 JVM 进程中

NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个 JVM 进程中

RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上

ANY:任务与数据是跨机架、甚至是跨 DC(Data Center,数据中心)的关系访问数据源是否会引入网络开销,取决于任务与数据的本地性关系,也就是任务的本地性级别

Shuffle 作为大多数计算场景的“性能瓶颈担当”,确实是网络开销的罪魁祸首。根据“能省则省”的开发原则,我们自然要想尽办法去避免 Shuffle。

在数据通过网络分发之前,我们可以利用 Kryo Serializer 序列化器,提升序列化字节的存储效率,从而有效降低在网络中分发的数据量,整体上减少网络开销。

部署这种模式,需要修改conf目录下的spark-envsh文件。在其中新增如下配置选项: export HADOOP_HOME= /home/hadoop/hadoop-200-cdh450 export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop SPARK_EXECUTOR_INSTANCES=2 SPARK_EXECUTOR_CORES

以上就是关于Spark 并行度全部的内容,包括:Spark 并行度、Spark原生GPU调度的前世今生、2022-02-24-Spark-44(性能调优通用调优)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/10146234.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-05
下一篇2023-05-05

发表评论

登录后才能评论

评论列表(0条)

    保存