我想利用图的拓扑信息添加一个新的PartitionStrategy,但是我发现PartitionStrategy只有一个函数,如下所示,我找不到任何可以接收图数据的函数。
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
println("partitioning!")
numParts
}
,此函数只能获取一条src-dst信息。
在spark graphx源代码org.apache.spark.graphx.impl.GraphImpl
中,我找到了如下代码:
override def partitionBy(
partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
val edTag = classTag[ED]
val vdTag = classTag[VD]
val newEdges = edges.withPartitionsRDD(edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}
.partitionBy(new HashPartitioner(numPartitions))
如下,partitionBy
来自PairRDDFunctions
类如下,
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
HashPartitioner
如下,
/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
,但这些函数无法获取图形数据。
我阅读了PowerGraph distributed_constrained_random_ingress.hpp和powerlyra distributed_hybrid_ingress.hpp的代码,在预处理阶段,它们可以得到图,所以图的拓扑信息可以使用。
我想利用图的拓扑信息,但我不知道如何在spark中添加一个新函数来获取图数据,然后给予每条边一个新的PartitionID
。
2条答案
按热度按时间rqqzpn5f1#
以下是一种方法:
PartitionStrategy
作为一个模拟示例,下面是一个代码片段,它使用以下规则对
graph
进行分区:* 如果目标也是图中的源,则将其分配给分区0,否则将其分配给分区1*capturedGraphData
的大小太大,驱动程序和执行器的内存会很痛苦,这就是为什么从图中只选择最少的必要信息很重要,因为它将在驱动程序上收集并广播给每个执行器。pkln4tw62#
我使用这个方法:(java)