Spark:以编程方式获取集群核心数

13z8s7eq  于 2022-12-04  发布在  Apache
关注(0)|答案(5)|浏览(176)

我在yarn cluster中运行我的spark应用程序。在我的代码中,我使用队列的可用内核数来创建数据集的分区:

Dataset ds = ...
ds.coalesce(config.getNumberOfCores());

我的问题:如何通过编程方式而不是通过配置方式获得队列中可用内核数?

bsxbgnwa

bsxbgnwa1#

你可以在每台机器上运行作业,然后问它内核的数量,但这并不一定是Spark所能提供的(正如@tribbloid在另一个答案的评论中所指出的):

import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum

在shell中运行它(在一个有两个工作线程的小型测试集群上)会得到:

scala> :paste
// Entering paste mode (ctrl-D to finish)

    import spark.implicits._
    import scala.collection.JavaConverters._
    import sys.process._
    val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
    val nCpus = procs.values.sum

// Exiting paste mode, now interpreting.

import spark.implicits._                                                        
import scala.collection.JavaConverters._
import sys.process._
procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2)
nCpus: Int = 4

如果集群中通常有 * 很多 * 机器,则在范围中添加零。即使在我的两台机器集群中,10000也会在几秒钟内完成。
这可能只在您想要比sc.defaultParallelism()给予的信息更多的信息时才有用(如@SteveC的答案)

2sbarzqh

2sbarzqh2#

对于所有不使用Yarn簇的产品:如果你用Python/Databricks来计算,我写了一个函数来帮助你解决这个问题,这个函数可以得到工作节点的数量和CPU的数量,并返回工作节点分布的最终CPU数量。

def GetDistCPUCount():
    nWorkers = int(spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterTargetWorkers'))
    GetType = spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterNodeType')
    GetSubString = pd.Series(test).str.split(pat = '_', expand = True)
    GetNumber = GetSubString[1].str.extract('(\d+)')
    ParseOutString = GetNumber.iloc[0,0]
    WorkerCPUs = int(ParseOutString)
    nCPUs = nWorkers * WorkerCPUs
    return nCPUs
fzsnzjdm

fzsnzjdm3#

有很多方法可以从Spark获得集群中的执行器数量和内核数量。这里有一些我以前用过的Scala实用程序代码。你应该可以很容易地将其应用到Java中。有两个关键的想法:
1.工作者的数量是执行者的数量减一或sc.getExecutorStorageStatus.length - 1
1.通过在工作线程上执行java.lang.Runtime.getRuntime.availableProcessors,可以获得每个工作线程的内核数。
剩下的代码是使用Scala隐式向SparkContext添加方便方法的样板。我在1.x年前编写了这段代码,这就是为什么它没有使用SparkSession
最后一点:这可以在数据不对称的情况下提高性能。在实践中,我使用1.5x到4x之间的任意值,具体取决于数据的大小以及作业是否在共享集群上运行。

import org.apache.spark.SparkContext

import scala.language.implicitConversions

class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    sc.getExecutorStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}

object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

更新

最近,getExecutorStorageStatus被删除了。我们改用SparkEnvblockManager.master.getStorageStatus.length - 1(减1还是用于驱动程序)。正常的方法是通过SparkContextenv访问它,但在org.apache.spark包之外是无法访问的。因此,我们使用了一个封装违规模式:

package org.apache.spark

object EncapsulationViolator {
  def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}
wlzqhblo

wlzqhblo4#

在寻找几乎相同问题的答案时发现了这一点。
我发现:

Dataset ds = ...
ds.coalesce(sc.defaultParallelism());

正好符合观察员的要求
例如,我的5节点x 8核心群集对于defaultParallelism返回40。

vsmadaxz

vsmadaxz5#

根据Databricks,如果驱动程序和执行器属于相同的节点类型,则应该这样做:

java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)

相关问题