/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
}
2条答案
按热度按时间flvlnr441#
据我所知,本地化级别表示对数据进行了哪种类型的访问。当一个节点完成了所有工作,CPU空闲时,Spark可能会决定启动其他需要从其他地方获取数据的挂起任务。因此,理想情况下,所有任务都应该是本地处理的,因为这与较低的数据访问延迟有关。
您可以使用以下命令配置移动到其他区域级别之前的等待时间:
有关参数的详细信息,请参见Spark Configuration docs
对于不同的级别
PROCESS_LOCAL
、NODE_LOCAL
、RACK_LOCAL
或ANY
,我认为**org.apache.spark.scheduler.TaskSetManager
中的方法findTask
和findSpeculativeTask
**说明了Spark如何基于任务的本地级别来选择任务。首先,它将检查要在同一个执行器进程中启动的
PROCESS_LOCAL
任务。如果不是,它将检查可能在同一节点的其他执行器中的NODE_LOCAL
任务,或者需要从HDFS、缓存等系统中检索的NODE_LOCAL
任务。RACK_LOCAL
表示数据在另一个节点中,因此需要在执行之前传输。最后,ANY
只是接受可能在当前节点中运行的任何挂起任务。qmb5sa222#
这里是我的两分钱,我总结了大部分从spark official guide。
首先,我想再添加一个局部性级别
NO_PREF
,这在this thread中已经讨论过。然后,我们将这些级别放在一个表中,
值得注意的是,特定的水平可以跳过根据指南从Spark配置。
例如,如果要跳过
NODE_LOCAL
,只需将spark.locality.wait.node
设置为0。