simpleskewidgroupbytest中的scala逻辑在spark中生成倾斜数据

ztyzrc3y  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(320)

spark examples目录中有一个生成倾斜数据的示例(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/simpleskewedgroupbytest.scala). 我不完全明白这个代码片段将如何生成扭曲的数据。在我看来,这个键总是唯一的,因为我们使用的是java.util.random中的方法。有人能解释一下吗。
我试着从sparkshell运行这个代码,但不理解逻辑。有人能解释一下吗?

scala> val args = Array("3", "1000", "1000")
args: Array[String] = Array(3, 1000, 1000)

scala> val numMappers = if (args.length > 0) args(0).toInt else 2
numMappers: Int = 3

scala> val numKVPairs = if (args.length > 1) args(1).toInt else 1000
numKVPairs: Int = 1000

scala> val valSize = if (args.length > 2) args(2).toInt else 1000
valSize: Int = 1000

scala> val numReducers = if (args.length > 3) args(3).toInt else numMappers
numReducers: Int = 3

scala> val ratio = if (args.length > 4) args(4).toInt else 5.0
ratio: Double = 5.0

val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p =>
      val ranGen = new Random
      val result = new Array[(Int, Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        val offset = ranGen.nextInt(1000) * numReducers
        if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
          // give ratio times higher chance of generating key 0 (for reducer 0)
          // println("p:"+p+"offset:"+offset)
          result(i) = (offset, byteArr)
        } else {
          // generate a key for one of the other reducers
          val key = 1 + ranGen.nextInt(numReducers-1) + offset
          // println("p:"+p+"key:"+key)
          result(i) = (key, byteArr)
        }
      }
      result
    }

scala>  pairs1.count
res11: Long = 3000

scala> println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
RESULT: 1618

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题