最后一个与我有关的问题是从2011年开始,我再问一次。。
我试图证明在并行化的spark阵列上进行聚合比在普通阵列上进行聚合要快(都是在4核的dellxps上)。
import org.apache.spark.{SparkConf, SparkContext}
object SparkStuffer extends App {
val appName: String = "My Spark Stuffer"
val master: String = "local"
val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc = new SparkContext(conf)
// Returns '4'
println("Available processors: " + Runtime.getRuntime().availableProcessors())
val data = 1 to 100000000
val distData = sc.parallelize(data)
val sequData = data
val parallelIni = java.lang.System.currentTimeMillis();
distData.reduce((a, b) => a+b)
val parallelFin = java.lang.System.currentTimeMillis();
val seqIni = java.lang.System.currentTimeMillis();
sequData.reduce((a, b) => a+b)
val seqFin = java.lang.System.currentTimeMillis();
println("Par: " + (parallelFin - parallelIni))
println("Seq: " + (seqFin - seqIni))
// Par: 3262
// Seq: 1099
}
我添加build.sbt作为参考:
name := "spark_stuff"
version := "0.1"
scalaVersion := "2.12.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
为什么并行聚合没有更快?如果不是在这里,有什么例子可以说明它更快?
1条答案
按热度按时间bmp9r5qi1#
这里有一个误解,你的第一步
distData.reduce((a, b) => a+b)
在这里做两件事。一是分发数据,二是处理数据。不仅处理你所期望的。spark框架在执行代码块的过程中有两个步骤:转换和操作。转换步骤是当spark只是准备后端需要做什么,检查数据是否存在,你正在做的事情是否有意义等等。这就是这里发生的事情:
sc.parallelize(data)
. 在这一刻,你的代码不是并行化任何东西,只是准备并行化,并行化发生在你运行的时候distData.reduce((a, b) => a+b)
这是一个动作,然后处理数据。我在我的群集中运行了相同的示例,下面是一些可以用作参考的结果:
在这里我们得到的执行就像你的代码:
这里有一个小小的改变,强制在reduce之前并行化,以消除使用以下代码分发的开销:
下面是它的速度:
但是,我们需要知道的是,并不是所有的分布式算法都会在开销上超过迭代算法。您的数据集非常小,而且大部分是在内存中构建的。因此,分布式代码只会在一定大小的数据中打败顺序代码。尺寸是多少?视情况而定。但是结论是,在这种情况下并行执行比较慢,因为动作步骤parallelize然后执行reduce。