spark学习之资源调度

x33g5p2x  于2022-03-25 转载在 Spark  
字(2.7k)|赞(0)|评价(0)|浏览(639)

💌前一篇博客中,我们学习了spark代码的执行过程,其中涉及到了逻辑执行计划和物理执行计划,今天我们主要来学习spark的资源调度的内容,对往期内容感兴趣的同学可以参考如下内容👇:

🐡说到spark的资源调度,我们主要关心的是执行计算任务节点的资源调度的设置,如单个Executor的核数、单个节点Executor的个数、单个Executor的内存大小等,接下来将会对这些进行详细的介绍。

1. 总体资源

我们这里假设单台服务器的内存大小为128g,32个线程。

1.1 Executor的核数

executor-cores的个数决定任务的并行度,也就是同时执行task的个数,一般情况下,executor-cores的个数设置为3~6个之间比较合适。

1.2 Executor的个数

这里的Executor的个数是指整个集群的Executor的个数个数,所以:
总 的 e x e c u t o r 个 数 = 每 个 节 点 的 e x e c u t o r 数 ∗ w o r k 节 点 数 总的 executor个数= 每个节点的 executor 数 * work 节点数总的executor个数=每个节点的executor数∗work节点数
那么单个节点的Executor个数如何计算,单个节点所能接受的最大Executor个数和yarn给的资源和Executor的核数有关,关系如下:

每 个 n o d e 的 e x e c u t o r 数 = 单 节 点 y a r n 总 核 数 每 个 e x e c u t o r 的 最 大 c p u 核 数 每个 node 的 executor 数 = \dfrac{单节点 yarn 总核数}{每个 executor 的最大 cpu 核数}每个node的executor数=每个executor的最大cpu核数单节点yarn总核数​

考虑到系统基础服务和 HDFS 等组件的余量,yarn的nodemanager资源配置为:28,参数 executor-cores 的值为:4,那么每个 node 的 executor 数 = 28/4 = 7,假设集群节点为 10,那么 num-executors = 7 * 10 = 70

1.3 Executor的内存

每个Executor的内存的大小也和yarn分配的资源有关系:
e x e c u t o r 内 存 大 小 = y a r n 总 内 存 大 小 每 个 节 点 的 e x e c u t o r 数 量 executor内存大小= \dfrac{yarn总内存大小}{每个节点的 executor 数量}executor内存大小=每个节点的executor数量yarn总内存大小​
例如:单个节点的 yarn 的参数配置为 100G,那么每个 Executor 大概就是 100G/7≈14G,同时要注意yarn 配置中每个容器允许的最大内存是否匹配,一般情况yarn默认配置的每个容器的内存大小范围为[1g,8g].

2. 内存资源

我们先来看一下spark的内存划分:

  • storage空间:用来存放cache、persist和广播变量的缓存数据 。(Storage 内存 = 广播变量 + cache/Executor 数量)
  • executor空间:主要负责执行计算过程中的内存开销,比如shuffle过程需要的内存。(Executor 内存 = 每个 Executor 核数 * (数据集大小/并行度))
  • other空间:自定义的数据结构和元数据,(Other 内存 = 自定义数据结构*每个 Executor 核数)

3. 持久化和序列化

我们先来介绍一下持久化和序列化在spark中的含义:

  • 持久化:在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速(通常快10倍)。
  • 序列化:序列化是将对象的状态信息转换为可以存储或传输的形式的过程,主要目的有两个:1. 存储到磁盘; 2. 通过网络进行传输;而在spark中,算子相关的操作在Excutor上执行,算子之外的代码在Driver端执行,在执行有些算子的时候,需要只用到Driver里面定义的数据,这就涉及到了跨进程或着跨节点之间的通讯,所以要求传递给Excutor中的数组所属的类型必须实现Serializable接口。

3.1 RDD

我们使用rdd持久化时,默认 cache 缓存级别(memory_only),如果此时存储内存较小,可以采用kryo+序列化缓存,可以优化存储内存占用。

3.2 Dataset和Dataframe

我们使用dataset持久化时,默认cache缓存级别(memory_and_disk),即内存不够磁盘来凑,如果采用序列化的方式,实际效果和未序列化差别不大,因为Dataset和Dataframe是被优化过的rdd,所以开发中直接使用cache缓存即可。

总体来说,如果使用RDD进行持久化,建议采用kryo序列化+持久化的操作,如果使用Dataset和Dataframe直接使用cache持久化即可。从性能上来讲,DataSet,DataFrame 优于 RDD,建议开发中使用 DataSet、DataFrame。

4. CPU资源

首先来了解几个非常容易搞混的概念:

4.1 并行度

并行度指的就是task的数量,或者说分区数量

  • rdd的控制方法:spark.default.parallelism,根据算子计算决定
  • sql的控制方法:spark.sql.shuffle.partitions,默认200

4.2 并发度

并发度指的可同时执行的最大task数量,那这和并行度有何区别,首先并行度是指一个任务需要多少个分区,比如groupby之后产生了200个分区(200task),但是我们的executor只有2个核,即我们最多同时执行2个task,那么这个任务的并发度就是2,并行度是200。每次执行2个task,执行完后换上下一个task,一直到执行完200个。

4.3 合理利用CPU

下面两种情况会导致CPU效率过低:

  • 并行度较低、数据分片较大容易导致 CPU 线程挂起(处理数据量大)
  • 并行度过高、数据过于分散会让调度开销更多(task数量过多)

根据经验:一般会将并行度(task 数)设置成并发度 (vcore 数)的 2 倍到 3 倍。

这一部分的详细内容可参考: spark学习之并行度、并发、core数和分区的关系.

5. 总结

本博客主要介绍了spark运行过程中的一些资源调度的情况,以及如何配置资源调度使spark的运行效率得到提高,主要从内存、持久化、cpu资源等方面进行了介绍。

6. 参考文章

相关文章