spark中的默认分区

dgsult0t  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(384)

我有一个关于rdd中默认分区的问题。

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions

我正在我的笔记本电脑上运行上面的代码,它有12个逻辑核心。因此,我看到创建了12个分区。
我的理解是散列分区用于确定哪个对象需要转到哪个分区。所以在本例中,公式是:hashcode()%12,但是当我进一步检查时,我看到所有的RDD都放在最后一个分区中。

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

上面的代码打印下面的(前11个分区是空的,最后一个分区有所有的对象。该行用于分隔每个分区的内容):

----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

我不知道为什么会这样。你能帮忙吗。
谢谢!

kmbjn2e3

kmbjn2e31#

我不认为这意味着你所有的数据都在最后一个分区。相反,因为 foreachPartition 如果并行执行,则可能是在打印值之前,虚线已经从所有执行器中打印出来了。打印行的顺序并不表示执行顺序。
如果您尝试下面的代码(源代码),您可以看到数据在执行器之间均匀地进行了分区(至少在我的计算机上是这样):

myRDD.mapPartitionsWithIndex((index, itr) => itr.toList.map(x => x + "#" + index).iterator).collect
// res6: Array[String] = Array(Animal(1,Lion)#1, Animal(2,Elephant)#2, Animal(3,Jaguar)#3, Animal(4,Tiger)#4, Animal(5,Chetah)#5)

相关问题