scala 在graphX中,如何使用自定义的PartitionStrategy(利用图的拓扑结构)对图进行分区?

pexxcrt2  于 2023-01-13  发布在  Scala
关注(0)|答案(2)|浏览(177)

我想利用图的拓扑信息添加一个新的PartitionStrategy,但是我发现PartitionStrategy只有一个函数,如下所示,我找不到任何可以接收图数据的函数。

override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    println("partitioning!")
    numParts
  }

,此函数只能获取一条src-dst信息。
在spark graphx源代码org.apache.spark.graphx.impl.GraphImpl中,我找到了如下代码:

override def partitionBy(
      partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
    val edTag = classTag[ED]
    val vdTag = classTag[VD]
    val newEdges = edges.withPartitionsRDD(edges.map { e =>
      val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
      (part, (e.srcId, e.dstId, e.attr))
    }
      .partitionBy(new HashPartitioner(numPartitions))
      .mapPartitionsWithIndex(
        { (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
          val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
          iter.foreach { message =>
            val data = message._2
            builder.add(data._1, data._2, data._3)
          }
          val edgePartition = builder.toEdgePartition
          Iterator((pid, edgePartition))
        }, preservesPartitioning = true)).cache()
    GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
  }

.partitionBy(new HashPartitioner(numPartitions))如下,partitionBy来自PairRDDFunctions类如下,

/**
   * Return a copy of the RDD partitioned using the specified partitioner.
   */
  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

HashPartitioner如下,

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

,但这些函数无法获取图形数据。
我阅读了PowerGraph distributed_constrained_random_ingress.hpp和powerlyra distributed_hybrid_ingress.hpp的代码,在预处理阶段,它们可以得到图,所以图的拓扑信息可以使用。
我想利用图的拓扑信息,但我不知道如何在spark中添加一个新函数来获取图数据,然后给予每条边一个新的PartitionID

rqqzpn5f

rqqzpn5f1#

以下是一种方法:

  • 从图形中收集最少的必要信息
  • 示例化捕获此信息的PartitionStrategy

作为一个模拟示例,下面是一个代码片段,它使用以下规则对graph进行分区:* 如果目标也是图中的源,则将其分配给分区0,否则将其分配给分区1*

val graph: Graph[_, _] = [...]

graph.partitionBy(
  new PartitionStrategy {
    // select distinct sources only
    val capturedGraphData: Set[Long] = graph
      .edges
      .map(e => e.srcId)
      .distinct()
      .collect()
      .toSet

    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
      if(capturedGraphData.contains(dst)) 0
      else 1
    }
  }
)
  • 关于可扩展性的注意事项 *:如果用例需要的capturedGraphData的大小太大,驱动程序和执行器的内存会很痛苦,这就是为什么从图中只选择最少的必要信息很重要,因为它将在驱动程序上收集并广播给每个执行器。
pkln4tw6

pkln4tw62#

我使用这个方法:(java)

PartitionStrategy newPS = new PartitionStrategy() {
    @Override
    public int getPartition(long src, long dst, int numParts) {
        //      val mixingPrime: VertexId = 1125899906842597L
        //      (math.abs(src * mixingPrime) % numParts).toInt
        Long mixingPrime = 1125899906842597L;
        return (int) Math.abs(src * mixingPrime) % numParts;
    }
};
graph.partitionBy(newPS);

相关问题