Sparkrdd分区效应

gt0wga4j  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(436)

我对重新分区操作感到困惑。请看下面的代码

import org.apache.spark._
import org.apache.log4j._

object FriendsByAge {

  def parseLine(line: String)={
    val fields = line.split(",")
    val age = fields(2).toInt
    val numFriends = fields(3).toInt

    (age, numFriends)
  }

  def main(args: Array[String]) = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "FriendsByAge")

    val lines = sc.textFile("./data/fakefriends-noheader.csv").repartition(1000)
    val rdd = lines.map(parseLine)

    println(rdd.getNumPartitions)

    val totalsByAge = rdd.mapValues(x=> (x,1)).reduceByKey((x, y) => (x._1+y._1, x._2 + y._2))

    println(totalsByAge.getNumPartitions)

    val averagesByAges = totalsByAge.mapValues(x => x._1/x._2)

    println(averagesByAges.getNumPartitions)
    val results = averagesByAges.collect()

    results.sortWith(_._2> _._2).foreach(println)
  }

}

在这里,我在将文件读入1000个分区之后重新划分rdd。因为Map操作会创建新的rdd,所以分区不会被保留。我仍然看到相同数量的分区。
问题是如何知道子rdd是否会保留父rdd分区?当子rdd使重新分区无效时,标准是什么。

nbysray5

nbysray51#

mapValues 不会改变已经生效的分区,它是 narrow 转变。你有两个。 reduceByKey 是关联的。spark在本地聚合并将这些结果发送到驱动程序或相关分区(在您的情况下)。如果不使用上的参数 reduceByKey 为了 number of partitions ,则为新rdd保留相同数量的分区,尽管分布不同。

相关问题