scala—为什么并行聚合在spark中没有更快?

xt0899hw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(319)

最后一个与我有关的问题是从2011年开始,我再问一次。。
我试图证明在并行化的spark阵列上进行聚合比在普通阵列上进行聚合要快(都是在4核的dellxps上)。

import org.apache.spark.{SparkConf, SparkContext}

object SparkStuffer extends App {

    val appName: String = "My Spark Stuffer"
    val master:  String = "local"

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val sc = new SparkContext(conf)

    // Returns '4'
    println("Available processors: " + Runtime.getRuntime().availableProcessors())

    val data = 1 to 100000000

    val distData = sc.parallelize(data)
    val sequData = data

    val parallelIni = java.lang.System.currentTimeMillis();
    distData.reduce((a, b) => a+b)
    val parallelFin = java.lang.System.currentTimeMillis();

    val seqIni = java.lang.System.currentTimeMillis();
    sequData.reduce((a, b) => a+b)
    val seqFin = java.lang.System.currentTimeMillis();

    println("Par: " + (parallelFin - parallelIni))
    println("Seq: " + (seqFin - seqIni))

    // Par: 3262
    // Seq: 1099

}

我添加build.sbt作为参考:

name := "spark_stuff"

version := "0.1"

scalaVersion := "2.12.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"

为什么并行聚合没有更快?如果不是在这里,有什么例子可以说明它更快?

bmp9r5qi

bmp9r5qi1#

这里有一个误解,你的第一步 distData.reduce((a, b) => a+b) 在这里做两件事。一是分发数据,二是处理数据。不仅处理你所期望的。
spark框架在执行代码块的过程中有两个步骤:转换和操作。转换步骤是当spark只是准备后端需要做什么,检查数据是否存在,你正在做的事情是否有意义等等。这就是这里发生的事情: sc.parallelize(data) . 在这一刻,你的代码不是并行化任何东西,只是准备并行化,并行化发生在你运行的时候 distData.reduce((a, b) => a+b) 这是一个动作,然后处理数据。
我在我的群集中运行了相同的示例,下面是一些可以用作参考的结果:
在这里我们得到的执行就像你的代码:

这里有一个小小的改变,强制在reduce之前并行化,以消除使用以下代码分发的开销:

val data = 1 to 100000000
val distData = sc.parallelize(data)
distData.count()
distData.reduce((a, b) => a+b)

下面是它的速度:

但是,我们需要知道的是,并不是所有的分布式算法都会在开销上超过迭代算法。您的数据集非常小,而且大部分是在内存中构建的。因此,分布式代码只会在一定大小的数据中打败顺序代码。尺寸是多少?视情况而定。但是结论是,在这种情况下并行执行比较慢,因为动作步骤parallelize然后执行reduce。

相关问题