Spark集群上的“局部性级别“是什么意思

uelo1irk  于 2023-02-09  发布在  Apache
关注(0)|答案(2)|浏览(168)

标题“局部性级别”和5个状态“数据局部”--〉“过程局部”--〉“节点局部”--〉“机架局部”--〉“任何”是什么意思?

flvlnr44

flvlnr441#

据我所知,本地化级别表示对数据进行了哪种类型的访问。当一个节点完成了所有工作,CPU空闲时,Spark可能会决定启动其他需要从其他地方获取数据的挂起任务。因此,理想情况下,所有任务都应该是本地处理的,因为这与较低的数据访问延迟有关。
您可以使用以下命令配置移动到其他区域级别之前的等待时间:

spark.locality.wait

有关参数的详细信息,请参见Spark Configuration docs
对于不同的级别PROCESS_LOCALNODE_LOCALRACK_LOCALANY,我认为**org.apache.spark.scheduler.TaskSetManager中的方法findTaskfindSpeculativeTask**说明了Spark如何基于任务的本地级别来选择任务。
首先,它将检查要在同一个执行器进程中启动的PROCESS_LOCAL任务。如果不是,它将检查可能在同一节点的其他执行器中的NODE_LOCAL任务,或者需要从HDFS、缓存等系统中检索的NODE_LOCAL任务。RACK_LOCAL表示数据在另一个节点中,因此需要在执行之前传输。最后,ANY只是接受可能在当前节点中运行的任何挂起任务。

/**
 * Dequeue a pending task for a given node and return its index and locality level.
 * Only search for tasks matching the given locality constraint.
 */
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
  : Option[(Int, TaskLocality.Value)] =
{
  for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
    return Some((index, TaskLocality.PROCESS_LOCAL))
  }

  if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
    for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
      return Some((index, TaskLocality.NODE_LOCAL))
    }
  }
    
  if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
    for {
      rack <- sched.getRackForHost(host)
      index <- findTaskFromList(execId, getPendingTasksForRack(rack))
    } {
      return Some((index, TaskLocality.RACK_LOCAL))
    }
  }

  // Look for no-pref tasks after rack-local tasks since they can run anywhere.
  for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
    return Some((index, TaskLocality.PROCESS_LOCAL))
  }

  if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
    for (index <- findTaskFromList(execId, allPendingTasks)) {
      return Some((index, TaskLocality.ANY))
    }
  }

  // Finally, if all else has failed, find a speculative task
  findSpeculativeTask(execId, host, locality)
}
qmb5sa22

qmb5sa222#

这里是我的两分钱,我总结了大部分从spark official guide
首先,我想再添加一个局部性级别NO_PREF,这在this thread中已经讨论过。
然后,我们将这些级别放在一个表中,

值得注意的是,特定的水平可以跳过根据指南从Spark配置。
例如,如果要跳过NODE_LOCAL,只需将spark.locality.wait.node设置为0。

相关问题