无法使用repartitionandsortwithinpartitions方法创建分区

js81xvg6  于 2021-06-08  发布在  Hbase
关注(0)|答案(0)|浏览(250)

我有rdd rddData: RDD[(String, Iterable[(String, String)])] 按密钥和基于密钥的预分裂区域排序, splits: Array[Array[Byte]] . 下面是我用来创建分区的代码片段 repartitionAndSortWithinPartitions 方法:

protected abstract class HFilePartitioner extends Partitioner {
    def extractKey(n: Any) = n match {
      case (k: java.lang.String, _) => k
      case (_) => 
    }
}

  class datasetPartitioner(splits: Array[Array[Byte]]) extends HFilePartitioner {   

    override def getPartition(key: Any): Int = {
      val k = extractKey(key)
      for (i <- 1 until splits.length)
        if (Bytes.compareTo(Bytes.toBytes(k.toString()), splits(i)) < 0) return i - 1

      splits.length - 1
    }
    override def numPartitions: Int = splits.length
  }

调用 repartitionAndSortWithinPartitions 方法

val partitionedData = rddData.repartitionAndSortWithinPartitions(new datasetPartitioner(splits))

我可以看到使用 partitionedData.partitions.length . 然后我用 partitionedData.mapPartitionsWithIndex((index, it) =>it.toList.map(x => if (index ==1) {println(x._1)}).iterator).collect 打印哪个部分包含哪些键,但只获取所有键 index == 0 ,其他分区不包含任何数据。虽然已经创建了6个分区,但是数据没有在分区之间分布。我想在所有分区之间分发数据。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题