我在yarn cluster中运行我的spark应用程序。在我的代码中,我使用队列的可用内核数来创建数据集的分区:
Dataset ds = ... ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置方式获得队列中可用内核数?
bsxbgnwa1#
你可以在每台机器上运行作业,然后问它内核的数量,但这并不一定是Spark所能提供的(正如@tribbloid在另一个答案的评论中所指出的):
import spark.implicits._ import scala.collection.JavaConverters._ import sys.process._ val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap val nCpus = procs.values.sum
在shell中运行它(在一个有两个工作线程的小型测试集群上)会得到:
scala> :paste // Entering paste mode (ctrl-D to finish) import spark.implicits._ import scala.collection.JavaConverters._ import sys.process._ val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap val nCpus = procs.values.sum // Exiting paste mode, now interpreting. import spark.implicits._ import scala.collection.JavaConverters._ import sys.process._ procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2) nCpus: Int = 4
如果集群中通常有 * 很多 * 机器,则在范围中添加零。即使在我的两台机器集群中,10000也会在几秒钟内完成。这可能只在您想要比sc.defaultParallelism()给予的信息更多的信息时才有用(如@SteveC的答案)
2sbarzqh2#
对于所有不使用Yarn簇的产品:如果你用Python/Databricks来计算,我写了一个函数来帮助你解决这个问题,这个函数可以得到工作节点的数量和CPU的数量,并返回工作节点分布的最终CPU数量。
def GetDistCPUCount(): nWorkers = int(spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterTargetWorkers')) GetType = spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterNodeType') GetSubString = pd.Series(test).str.split(pat = '_', expand = True) GetNumber = GetSubString[1].str.extract('(\d+)') ParseOutString = GetNumber.iloc[0,0] WorkerCPUs = int(ParseOutString) nCPUs = nWorkers * WorkerCPUs return nCPUs
fzsnzjdm3#
有很多方法可以从Spark获得集群中的执行器数量和内核数量。这里有一些我以前用过的Scala实用程序代码。你应该可以很容易地将其应用到Java中。有两个关键的想法:1.工作者的数量是执行者的数量减一或sc.getExecutorStorageStatus.length - 1。1.通过在工作线程上执行java.lang.Runtime.getRuntime.availableProcessors,可以获得每个工作线程的内核数。剩下的代码是使用Scala隐式向SparkContext添加方便方法的样板。我在1.x年前编写了这段代码,这就是为什么它没有使用SparkSession。最后一点:这可以在数据不对称的情况下提高性能。在实践中,我使用1.5x到4x之间的任意值,具体取决于数据的大小以及作业是否在共享集群上运行。
sc.getExecutorStorageStatus.length - 1
java.lang.Runtime.getRuntime.availableProcessors
SparkContext
SparkSession
import org.apache.spark.SparkContext import scala.language.implicitConversions class RichSparkContext(val sc: SparkContext) { def executorCount: Int = sc.getExecutorStorageStatus.length - 1 // one is the driver def coresPerExecutor: Int = RichSparkContext.coresPerExecutor(sc) def coreCount: Int = executorCount * coresPerExecutor def coreCount(coresPerExecutor: Int): Int = executorCount * coresPerExecutor } object RichSparkContext { trait Enrichment { implicit def enrichMetadata(sc: SparkContext): RichSparkContext = new RichSparkContext(sc) } object implicits extends Enrichment private var _coresPerExecutor: Int = 0 def coresPerExecutor(sc: SparkContext): Int = synchronized { if (_coresPerExecutor == 0) sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head else _coresPerExecutor } }
更新
最近,getExecutorStorageStatus被删除了。我们改用SparkEnv的blockManager.master.getStorageStatus.length - 1(减1还是用于驱动程序)。正常的方法是通过SparkContext的env访问它,但在org.apache.spark包之外是无法访问的。因此,我们使用了一个封装违规模式:
getExecutorStorageStatus
SparkEnv
blockManager.master.getStorageStatus.length - 1
env
org.apache.spark
package org.apache.spark object EncapsulationViolator { def sparkEnv(sc: SparkContext): SparkEnv = sc.env }
wlzqhblo4#
在寻找几乎相同问题的答案时发现了这一点。我发现:
Dataset ds = ... ds.coalesce(sc.defaultParallelism());
正好符合观察员的要求例如,我的5节点x 8核心群集对于defaultParallelism返回40。
defaultParallelism
vsmadaxz5#
根据Databricks,如果驱动程序和执行器属于相同的节点类型,则应该这样做:
java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)
5条答案
按热度按时间bsxbgnwa1#
你可以在每台机器上运行作业,然后问它内核的数量,但这并不一定是Spark所能提供的(正如@tribbloid在另一个答案的评论中所指出的):
在shell中运行它(在一个有两个工作线程的小型测试集群上)会得到:
如果集群中通常有 * 很多 * 机器,则在范围中添加零。即使在我的两台机器集群中,10000也会在几秒钟内完成。
这可能只在您想要比sc.defaultParallelism()给予的信息更多的信息时才有用(如@SteveC的答案)
2sbarzqh2#
对于所有不使用Yarn簇的产品:如果你用Python/Databricks来计算,我写了一个函数来帮助你解决这个问题,这个函数可以得到工作节点的数量和CPU的数量,并返回工作节点分布的最终CPU数量。
fzsnzjdm3#
有很多方法可以从Spark获得集群中的执行器数量和内核数量。这里有一些我以前用过的Scala实用程序代码。你应该可以很容易地将其应用到Java中。有两个关键的想法:
1.工作者的数量是执行者的数量减一或
sc.getExecutorStorageStatus.length - 1
。1.通过在工作线程上执行
java.lang.Runtime.getRuntime.availableProcessors
,可以获得每个工作线程的内核数。剩下的代码是使用Scala隐式向
SparkContext
添加方便方法的样板。我在1.x年前编写了这段代码,这就是为什么它没有使用SparkSession
。最后一点:这可以在数据不对称的情况下提高性能。在实践中,我使用1.5x到4x之间的任意值,具体取决于数据的大小以及作业是否在共享集群上运行。
更新
最近,
getExecutorStorageStatus
被删除了。我们改用SparkEnv
的blockManager.master.getStorageStatus.length - 1
(减1还是用于驱动程序)。正常的方法是通过SparkContext
的env
访问它,但在org.apache.spark
包之外是无法访问的。因此,我们使用了一个封装违规模式:wlzqhblo4#
在寻找几乎相同问题的答案时发现了这一点。
我发现:
正好符合观察员的要求
例如,我的5节点x 8核心群集对于
defaultParallelism
返回40。vsmadaxz5#
根据Databricks,如果驱动程序和执行器属于相同的节点类型,则应该这样做: