我有rdd rddData: RDD[(String, Iterable[(String, String)])]
按密钥和基于密钥的预分裂区域排序, splits: Array[Array[Byte]]
. 下面是我用来创建分区的代码片段 repartitionAndSortWithinPartitions
方法:
protected abstract class HFilePartitioner extends Partitioner {
def extractKey(n: Any) = n match {
case (k: java.lang.String, _) => k
case (_) =>
}
}
class datasetPartitioner(splits: Array[Array[Byte]]) extends HFilePartitioner {
override def getPartition(key: Any): Int = {
val k = extractKey(key)
for (i <- 1 until splits.length)
if (Bytes.compareTo(Bytes.toBytes(k.toString()), splits(i)) < 0) return i - 1
splits.length - 1
}
override def numPartitions: Int = splits.length
}
调用 repartitionAndSortWithinPartitions
方法
val partitionedData = rddData.repartitionAndSortWithinPartitions(new datasetPartitioner(splits))
我可以看到使用 partitionedData.partitions.length
. 然后我用 partitionedData.mapPartitionsWithIndex((index, it) =>it.toList.map(x => if (index ==1) {println(x._1)}).iterator).collect
打印哪个部分包含哪些键,但只获取所有键 index == 0
,其他分区不包含任何数据。虽然已经创建了6个分区,但是数据没有在分区之间分布。我想在所有分区之间分发数据。
暂无答案!
目前还没有任何答案,快来回答吧!