在spark中如何将阶段划分为任务?

zte4gxcn  于 2021-07-09  发布在  Spark
关注(0)|答案(3)|浏览(344)

让我们假设每个时间点只有一个spark作业在运行。

到目前为止我得到了什么

以下是我对spark的理解:
SparkContext 创建时,每个工作节点启动一个执行器。执行器是独立的进程(jvm),连接回驱动程序。每个执行器都有驱动程序的jar。放弃一个司机,关闭遗嘱执行人。每个执行器可以保存一些分区。
执行作业时,将根据沿袭图创建执行计划。
执行作业被分为多个阶段,其中阶段包含尽可能多的相邻(在沿袭图中)转换和操作,但不包含无序排列。因此,各阶段被洗牌分开。

我明白
任务是通过序列化函数对象从驱动程序发送给执行器的命令。
执行器(使用驱动程序jar)反序列化命令(任务)并在分区上执行它。
但是

问题

如何将阶段划分为这些任务?
明确地:
任务是由转换和操作决定的,还是一个任务中可以有多个转换/操作?
是由分区确定的任务(例如,每个分区每个阶段一个任务)。
任务是否由节点决定(例如,每个节点每个阶段一个任务)?

我的想法(仅部分回答,即使正确)

在https://0x0fff.com/spark-architecture-shuffle,用图像解释了洗牌

我觉得规则是
每个阶段被划分为#个分区任务,而不考虑节点的数量
对于我的第一张图片,我会说我有3个map任务和3个reduce任务。
对于来自0x0fff的图像,我认为有8个map任务和3个reduce任务(假设只有3个橙色和3个深绿色文件)。

任何情况下的开放性问题

对吗?但是,即使这是正确的,我上面的问题也没有全部得到回答,因为它仍然是开放的,不管多个操作(例如多个Map)是在一个任务中,还是在每个操作中分为一个任务。

别人怎么说

spark中的任务是什么?spark worker如何执行jar文件?apachespark调度器如何将文件分割成任务?类似的,但我觉得我的问题没有得到明确的回答。

h5qlskok

h5qlskok1#

这可能有助于您更好地理解不同的部分:
阶段:是任务的集合。同一进程针对不同的数据子集(分区)运行。
任务:表示分布式数据集分区上的工作单元。所以在每个阶段中,任务数=分区数,或者如您所说的“每个分区每个阶段一个任务”。
每个执行器在一个Yarn容器上运行,每个容器驻留在一个节点上。
每个阶段使用多个执行器,每个执行器分配多个vcore。
每个vcore一次只能执行一个任务
因此,在任何阶段,都可以并行执行多个任务。正在运行的任务数=正在使用的vCore数。

zy1mlcev

zy1mlcev2#

如果我理解正确,有两件(相关的)事情让你困惑:
1) 什么决定了任务的内容?
2) 什么决定了要执行的任务的数量?
spark的引擎将连续RDD上的简单操作“粘合”在一起,例如:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

因此,当rdd3被(延迟地)计算时,spark将为rdd1的每个分区生成一个任务,并且每个任务都将执行过滤器和每行的Map以得到rdd3。
任务的数量由分区的数量决定。每个rdd都有一个定义的分区数。对于从hdfs读取的源rdd(使用sc.textfile(…)例如)分区数是由输入格式生成的分割数。对rdd的某些操作可能会导致rdd具有不同数量的分区:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

另一个例子是连接:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(大多数)更改分区数的操作都涉及到一次洗牌,例如:

rdd2 = rdd1.repartition( 1000 )

实际上,rdd1的每个分区上的任务都需要生成一个可由下一阶段读取的最终输出,以便rdd2正好有1000个分区(它们是如何做到的?散列或排序)。这一边的任务有时被称为“Map(边)任务”。稍后在rdd2上运行的任务将作用于(rdd2的)一个分区并且必须弄清楚如何读取/组合与该分区相关的map-side输出。这方面的任务有时被称为“减少(侧)任务”。
这两个问题是相关的:一个阶段中的任务数是分区数(对于“粘合”在一起的连续rdd来说是公共的),一个rdd的分区数可以在两个阶段之间改变(例如,通过指定某个引起无序的操作的分区数)。
一旦阶段开始执行,它的任务就可以占用任务槽。并发任务槽的数量为numexecutors*executorcores。一般来说,这些任务可以被来自不同的、不相关的阶段的任务占用。

qni6mghb

qni6mghb3#

你的轮廓很好。回答你的问题
单独的 task 需要为每个分区的每个数据启动 stage . 考虑到每个分区可能位于不同的物理位置,例如hdfs中的块或本地文件系统的目录/卷。
请注意 Stage s是由 DAG Scheduler . 这意味着不相互依赖的阶段可以提交给集群并行执行:这最大限度地提高了集群的并行化能力。因此,如果我们的数据流中的操作可以同时发生,我们将期望看到多个阶段启动。
我们可以在下面的玩具示例中看到,我们执行以下类型的操作:
加载两个数据源
分别对两个数据源执行一些Map操作
加入他们
对结果执行一些Map和筛选操作
保存结果
那么我们最终会有多少个阶段呢?
每个阶段1个,用于并行加载两个数据源=2个阶段
第三阶段代表 join 这取决于其他两个阶段
注意:对联接数据进行的所有后续操作都可以在同一阶段中执行,因为它们必须按顺序进行。启动附加阶段没有任何好处,因为它们在先前的操作完成之前无法开始工作。
这是玩具程序

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

这是结果的dag

现在:有多少任务?任务数应等于
总计( Stage * #Partitions in the stage )

相关问题