pregelAPI-为什么小型图上的迭代会消耗这么多内存?

ut6juiuv  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(175)

我对spark和scala比较陌生,但是我决定在这里发布一个代码示例,它非常简单,在我看来不应该引起严重的问题,但是在实践中,它在aws emr spark环境中经常会导致内存不足错误,这取决于 maxIterations :

import java.net.URI

import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._

import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException

val config = new SparkConf().setAppName("test graphx")
config.set("spark.driver.allowMultipleContexts","true")
val batch_id=new Integer(31)

val maxIterations=2 //200 interations are causing out of memory

var myVertices = sc.makeRDD(Array( (1L,  ("A",batch_id,0.0,0.0,0.0,11.0)), (2L,  ("B",batch_id,0.0,1000.0,0.0,300.0)), (3L, ( "C", batch_id, 1000.0, 1000.0, 0.0, 8.0)), (4L,  ("D",batch_id,1000.0, 0.0, 0.0, 400.0)) ))
var myEdges = sc.makeRDD(Array(Edge(4L, 3L, (7.7, 0.0) ), Edge(2L, 3L, (5.0, 0.0) ), Edge(2L, 1L, (12.0, 0.0))))

var myGraph=Graph(myVertices,myEdges)
    myGraph.cache

myGraph.triplets.foreach(println)

//we need to calculate some constant values for each edge before start of pregel
val initGraph=myGraph.mapTriplets(tr => 
    (tr.attr._1,  (tr.attr._1 * 
                    (scala.math.sqrt((tr.dstAttr._3-tr.srcAttr._3)*(tr.dstAttr._3-tr.srcAttr._3)+( tr.dstAttr._4-tr.srcAttr._4)*( tr.dstAttr._4-tr.srcAttr._4)+(tr.dstAttr._5-tr.srcAttr._5)*(tr.dstAttr._5-tr.srcAttr._5))) *
                    (scala.math.sqrt((tr.dstAttr._3-tr.srcAttr._3)*(tr.dstAttr._3-tr.srcAttr._3)+( tr.dstAttr._4-tr.srcAttr._4)*( tr.dstAttr._4-tr.srcAttr._4)+(tr.dstAttr._5-tr.srcAttr._5)*(tr.dstAttr._5-tr.srcAttr._5))) /
                    (tr.dstAttr._6 * tr.dstAttr._6)) 
    )

)

initGraph.triplets.take(100).foreach(println)

val distanceStep = 0.1
val tolerance = 1

val sssp = initGraph.pregel( (0.0, 0.0, 0.0, 0.0), maxIterations //500-3000
 )(
  (id: VertexId, vert: ((String, Integer, Double, Double, Double, Double)), msg: (Double, Double, Double, Double)) =>
    (
      vert._1,vert._2,
      ( if (scala.math.abs(msg._1)> tolerance) {vert._3+distanceStep*msg._1 } else { vert._3 }),
      ( if (scala.math.abs(msg._2)> tolerance) {vert._4+distanceStep*msg._2 } else { vert._4 }),
      ( if (scala.math.abs(msg._3)> tolerance) {vert._5+distanceStep*msg._3 } else { vert._5 }),
      vert._6
    ),// Vertex Program
  e => {  // Send Message
    Iterator(
      (
        e.dstId,
        (
          ((e.srcAttr._3 - e.dstAttr._3)*distanceStep*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )), //x
          ((e.srcAttr._4 - e.dstAttr._4)*distanceStep*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )), //y
          ((e.srcAttr._5 - e.dstAttr._5)*distanceStep*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )), //z
          e.attr._1*distanceStep*scala.math.sqrt((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) //vector module
        )
      )
    )
  },
  {
    (a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3, 0) // Merge Message
  }
)

sssp.vertices.take(10).foreach(println)

我通过zeppelin在4节点m5.x2大型集群上的aws emr中运行它,但是它可以在spark中作为作业快速采用和执行。
简而言之,这段代码创建了一个 myGraph 具有4个顶点和3条边的图。然后对于每个三元组,我计算一些常量值并使用图形对象 initGraph 为了这个。
那么对于 initGraph 我应用pregelapi,它的执行只受迭代次数的限制 maxIterations . 在这一刻,对于pregelapi,我看到了奇怪的行为。对于小型 maxIterations 值(小于10)它工作得非常快,对于100-150次迭代,它在齐柏林飞艇中执行3-4分钟,对于200次迭代,它会以不同的错误失败(connectionclosed等)。
当我把maxiterations=150或200设置为

分配的内存直线上升,可用内存以同样的速度下降。
由于我是spark的新手,我不确定这是不是正确的行为,老实说,我找不到一个解释,即使在这样一个小的图上,200次pregel迭代,什么会消耗千兆字节的内存。如果您可以在您的终端上复制它并进行检查,我很好奇地听取您关于性能优化的建议,因为如果我扩展集群并在更大的硬件设置上运行相同的代码,这只是一个问题 maxIterations 和图的大小来实际得到相同的outofmemory错误。我需要在超过1米的顶点和7米的边上运行这个程序,所以我不知道如果这个问题不能解决,需要什么样的硬件。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题