当我们写sql实现数据需求的时候,得多提醒自己,sql里的每个字段,

不管是聚合、还是join;

不管它们占的空间有多大;

不管它们是int类型的,还是一个大json串;

不管实现整个聚合的计算用的是HashAgg 、是ObjectHashAgg 或者是SortAgg;

不管实现整个JOIN的计算用的是SortMergeJoin 、是ShuffleHashJoin 、还是BroadcastHashJoin...

sql中涉及到的数据,都是要在内存里走一趟的,所以对内存的理解是一个非常重要的事情,理解的越透彻,我们在解决实际问题时,就会越清晰。

之前总是在学习sparksql的源码,接下来,打算从内存的角度,做一系列的总结,加深对spark的理解,毕竟未来几年,还是要继续和spark打交道。

内存系列会涉及到Executor整体内存的构成、我们平时常用的窗口函数、聚合函数、JOIN等,内容大概如下:

Spark on YARN Executor整体内存理解及Trouble Shooting 窗口函数内存使用理解,年前貌似写过一篇 窗口函数为什么更容易出现性能问题?——一个优化案例 聚合函数内存使用理解 JOIN内存使用理解

那下面就开始吧

1、Executor内存构成

从代码里看到,Executor内存组成如果下图:

第一层,整个Executor是YARN的一个container,而

单个container可申请的最大内存受到yarn.scheduler.maximum-allocation-mb参数限制,所以Executor的总内存受到yarn.scheduler.maximum-allocation-mb参数控制

Executor中的内存分为两个部分,一部分被JVM管理,我们标记为JVM(堆内);另一部分不被JVM管理,我们标记为OUT OF JVM(堆外)

先说OUT OF JVM吧~

spark有这么多种堆外的参数,一开始,我也疑惑,它们的区别是什么?各自的作用又是啥?该怎么理解呢?

1.1 【堆外】OUT OF JVM

按上图,从右往左

1.1.1 【基本不操作】spark.memory.offHeap.size

Spark 1.6 开始引入了Off-heap memory(SPARK-11389)。这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的 malloc() 直接向操作系统申请内存,由于这种方式不经过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。

从代码上来看,引入这块,主要是为了支持Tungsten项目,Tungsten项目致力于提升Spark程序对内存和CPU的利用率,使性能达到硬件的极限。

Tungsten项目主要包括:

Memory Management and Binary Processing: off-heap管理内存,降低对象的开销和消除JVM GC带来的延时,通过JVM提供的sun.misc.Unsafe API来实现 (UnsafeRow) Cache-aware computation: 优化存储,提升CPU L1/ L2/L3缓存命中率,主要应用在UnsafeExternalSorter和UnsafeInMemorySorter的实现上 Code generation: 优化Spark SQL的代码生成部分(WSCG),提升CPU利用率。关于WSCG,之前有篇文章做了一些总结 SparkSql全代码生成规则梳理-CollapseCodegenStages

这个参数默认是关闭的,所以,我们在日常跑任务时Tungsten项目并没有真正的实现使用堆外内存。可以通过设置spark.memory.offHeap.enabled=true 启用,并且通过 spark.memory.offHeap.size 设置堆外内存大小,单位是字节

1.1.2 【基本不操作】spark.executor.pyspark.memory

这个参数,用来对pyspark的executor内存进行限制,主要用于执行python上编写的UDF

默认不设置。如果设置的话,pyspark的使用内存不能大于这个值;如果不设置,这块内存的使用包含在spark.executor.memoryOverhead中

1.1.3 【偶尔操作】spark.executor.memoryOverhead

在 YARN,K8S 部署模式下,container 会预留一部分内存,形式是堆外,用来保证稳定性,主要用于创建Java Object时的额外开销,Native方法调用,线程栈, NIO Buffer等;

比如:Spark的shuffle部分使用了netty框架进行网络传输,但netty会申请堆外内存缓存(PooledByteBufAllocator ,AbstractByteBufAllocator)

参数说明

spark.executor.memoryOverhead,Spark 2.3 前,这个参数名为:spark.yarn.executor.memoryOverhead

spark.yarn.executor.memoryOverhead这个参数还没有完全被废弃,在未来会被废弃。

目前开源版本(branch3.3)的处理逻辑是:如果配制了spark.executor.memoryOverhead参数,就用spark.executor.memoryOverhead,如果没有配制spark.executor.memoryOverhead,但配制了spark.yarn.executor.memoryOverhead,会取spark.yarn.executor.memoryOverhead的值来做为spark.executor.memoryOverhead,并给出警告

参数设置

384 MB 与 executorMemory * 0.10之间取最max,对应代码如下:

1.1.4 两个堆外参数的区别(memoryOverhead 、offHeap.size)

可以这么理解,

spark.executor.memoryOverhead 是spark中广义的堆外内存,for yarn资源manager,作用比较杂(代码缓存、线程栈、SparkR、pyspark...在spark 2.4.5及之前的版本,spark.executor.memoryOverhead也包含spark.memory.offHeap.size ); 而spark.memory.offHeap.size 更像是spark中狭义的堆外内存,for spark mem  manager, 作用更集中:为了Tungsten项目提高executor对内存使用的效率

1.2 【堆内】JVM

关于堆内内存,是我们最熟悉、最经常操作的,之前有从任务优化角度总结过一篇比较详细的文章:从一个sql任务理解spark内存模型

1.2.1 【经常操作】spark.executor.memory

理解

(网上的这个图片非常经典,对于新版本的spark堆内内存的原理仍然适用)

jvm堆内的内存分为四个部分(spark.memory.fraction=0.6)

(预留内存)reservedMemory: 预留内存300M,用于保障spark正常运行;如果Executor分配的内存小于 1.5 * 300 = 450M 时,Executor将无法执行

NOTES:为什么设置300M预留内存?

统一内存管理最初版本这部分内存没有固定值 300M 设置,而是设置的百分比,最初版本占 25%。百分比设置在实际使用中出现了问题,若给定的内存较低时,例如 1G,会导致 OOM,具体讨论参考这里 https://issues.apache.org/jira/browse/SPARK-12081。因此,这部分内存做了修改,先划出 300M 内存

​(用户/其他内存)user/other memory: 用于spark内部的一些元数据、用户的数据结构、防止在稀疏和异常大的记录的情况下出现对内存估计不足导致oom时的内存缓冲;(spark.executor.memory-300M)* 40% (执行内存)execution: 用于spark的计算:shuffle、sort、agg等这些计算时会用到的内存;(spark.executor.memory-300M)* 60%*50% (存储内存)storage(spark.memory.storageFraction=0.5): 主要用于rdd的缓存;(spark.executor.memory-300M)* 60%*50%

NOTES:spark.memory.fraction 由 0.75 降至 0.6 spark.memory.fraction 最初版本的值是 0.75,很多分析统一内存管理这块的文章也是这么介绍的,同样的,在使用中发现这个值设置的偏高,导致了 gc 时间过长,spark 2.0 版本将其调整为 0.6,详细谈论参见 https://issues.apache.org/jira/browse/SPARK-15796

从spark web ui看实际可用内存

在【Executors】页面的【Storage Memory】列,可以看到Executor的真正可以使用的内存,截图的案例 spark.executor.memory=9G,spark.memory.fraction=0.6,计算公式如下:

(spark.executor.memory-300M)*0.6 = (9*1024-300)*0.6/1024=5.2G

web页面计算的代码如下:

1.3 报错总结(trouble shooting)

Executor中task的内存分配:

Executor中task以线程的方式执行,各线程共享JVM的资源,因此,可能会出现:先到达的任务可能占用较大的内存,而后到的任务因得不到足够的内存而挂起。

每个任务可占用的内存大小为潜在可使用计算内存的1/2n – 1/n , 当剩余内存为小于1/2n时,任务将被挂起,直至有其他任务释放执行内存,而满足内存下限1/2n,任务被唤醒,其中n为当前Executor中活跃的任务数。

比如如果 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 可以申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

任务执行过程中,如果需要更多的内存,则会进行申请;如果存在空闲内存,则自动扩容成功,否则,抛出OutOffMemroyError。

第一种情况:Executor 堆内存(M2)不足

 解决方案 增加单个task可使用内存的量

增加spark.executor.memory值,使每个Task可使用内存增加 降低Executor的可用core的数量spark.executor.cores参数的值 , 使Executor中同时运行的任务数减少,在总资源不变的情况下,使每个Task获得的内存相对增加 降低单个Task的内存消耗量

减少每个Task处理的数据量,在Spark中,每个partition对应一个处理任务Task, 因此,在数据总量一定的前提下,可以通过增加partition数量的方式来减少每个Task处理的数据量,从而降低Task的内存开销,一般是我们的AQE系列参数:spark.sql.adaptive.maxNumPostShufflePartitions... 调整代码逻辑

第二种情况:Executor 堆外内存(M1)不足

解决方案 出现该问题原因是由于实际使用内存上限超过申请的内存上限而被Yarn终止掉了,Yarn中Container内存监控机制:

Container进程的内存使用量:以Container进程为根的进程树中所有进程的内存使用总量 Container被杀死的判断依据:进程树总内存(物理内存或虚拟内存)使用量超过向Yarn申请的内存上限值,则认为该Container使用内存超量,可以被“杀死” 因此,对该异常的分析要从是否存在子进程两个角度出发: 1) 不存在子进程 根据Container进程杀死的条件可知,在不存在子进程时,出现killed by yarn问题是于由Executor(JVM)进程自身内存超过向Yarn申请的内存总量(M1+M2)所致。 如果是堆内内存(M2)不足,则会先报Java heap space OOM异常,因此可判定其为堆外内存( M1 )不足 2)存在子进程 Spark 应用中Container以Executor(JVM进程)的形式存在,因此根进程为Executor对应的进程, 而Spark 应用向Yarn申请的总资源M = M1  + M 2 , 都是以Executor(JVM)进程(非进程树)可用资源的名义申请的。 申请的资源并非一次性全量分配给JVM使用,而是先为JVM分配初始值,随后内存不足时再按比率不断进行扩容,直致达到Container监控的最大内存使用量M 。 当Executor中启动了子进程(pyspark等)时,子进程占用的内存(记为 S) 就被加入Container进程树,此时就会影响Executor实际可使用内存资源(Executor进程实际可使用资源为:M - S),然而启动JVM时设置的可用最大资源为M, 且JVM进程并不会感知Container中留给自己的使用量已被子进程占用,因此,当 JVM使用量达到 M - S,还会继续开劈内存空间,这就会导致Executor进程树使用的总内存量大于M 而被Yarn 杀死。 依据Yarn内存使用情况有如下两种方案:

如果,M1+M2未达到Yarn单个Container允许的上限(yarn.scheduler.maximum-allocation-mb )时,可增加M1(如果是PySpark,可增加spark.executor.pyspark.memory,也可以只增加spark.executor.memoryOverhead) ;如果,M达到Yarn单个Container允许的上限时,增加 M1, 降低 M2 降低Executor的可用core的数量 spark.executor.cores参数的值, 使并行任务数减少,从而减少Overhead开销

1.4 最后思考一个问题

spark.memory.offHeap.size参数默认是关闭的,我们很少去用它,那什么场景下更适合打开这个参数呢?

最近在尝试这一块呀

sparksql源码精读第三期直播,预计3月4号开始,需要的来~~

刚看了一下,有3.4版本了,这次以branch3.4来讲,方便我们了解sparksql最新的特性

 

推荐阅读:

Spark sql 生成PhysicalPlan(源码详解)

Spark sql规则执行器RuleExecutor(源码解析)

你真的了解Lateral View explode吗?--源码复盘

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: