spark examples目录中有一个生成倾斜数据的示例(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/simpleskewedgroupbytest.scala). 我不完全明白这个代码片段将如何生成扭曲的数据。在我看来,这个键总是唯一的,因为我们使用的是java.util.random中的方法。有人能解释一下吗。
我试着从sparkshell运行这个代码,但不理解逻辑。有人能解释一下吗?
scala> val args = Array("3", "1000", "1000")
args: Array[String] = Array(3, 1000, 1000)
scala> val numMappers = if (args.length > 0) args(0).toInt else 2
numMappers: Int = 3
scala> val numKVPairs = if (args.length > 1) args(1).toInt else 1000
numKVPairs: Int = 1000
scala> val valSize = if (args.length > 2) args(2).toInt else 1000
valSize: Int = 1000
scala> val numReducers = if (args.length > 3) args(3).toInt else numMappers
numReducers: Int = 3
scala> val ratio = if (args.length > 4) args(4).toInt else 5.0
ratio: Double = 5.0
val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
val result = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
val offset = ranGen.nextInt(1000) * numReducers
if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
// give ratio times higher chance of generating key 0 (for reducer 0)
// println("p:"+p+"offset:"+offset)
result(i) = (offset, byteArr)
} else {
// generate a key for one of the other reducers
val key = 1 + ranGen.nextInt(numReducers-1) + offset
// println("p:"+p+"key:"+key)
result(i) = (key, byteArr)
}
}
result
}
scala> pairs1.count
res11: Long = 3000
scala> println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
RESULT: 1618
暂无答案!
目前还没有任何答案,快来回答吧!