spark2.3:如何在迭代算法中从内存中释放rdd

93ze6v8z  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(358)

以https://livebook.manning.com/book/spark-graphx-in-action/chapter-6/1

import org.apache.spark.graphx._

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
    var g2 = g.mapVertices(
        (vid,vd) => (false, if (vid == origin) 0.0 else Double.MaxValue,
            List[VertexId]()))
    for (i <- 1L to g.vertices.count-1) {
        val currentVertexId =
            g2.vertices.filter(!_._2._1) 
                .fold((0L,(false,Double.MaxValue,List[VertexId]())))((a,b) =>
                    if (a._2._2 < b._2._2) a else b)
                ._1
    val newDistances = g2.aggregateMessages[(Double,List[VertexId])]( 
        ctx => if (ctx.srcId == currentVertexId) 
            ctx.sendToDst((
                ctx.srcAttr._2 + ctx.attr,
                ctx.srcAttr._3 :+ ctx.srcId)),
                (a,b) => if (a._1 < b._1) a else b)
    g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
        val newSumVal =
            newSum.getOrElse((Double.MaxValue,List[VertexId]())) 
        (vd._1 || vid == currentVertexId,
            math.min(vd._2, newSumVal._1),
            if (vd._2 < newSumVal._1) vd._3 else newSumVal._2)})
    }
    g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
        (vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]())).productIterator.toList.tail))
}

val myVertices = spark.sparkContext.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"),
  (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges = spark.sparkContext.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
  Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
  Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
  Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val myGraph = Graph(myVertices, myEdges)

val result = dijkstra(myGraph, 1L)

result.vertices.map(_._2).collect

每次我运行这个代码 VertexRDD 留在记忆里,我无法释放它。

似乎graphx正在缓存图形数据,即使代码中没有指定它。是否可以从内存中释放上一次运行的rdd数据?
我试着通过这样做来消除持久性 result.unpersist() , result.vertices.unpersist() , result.edges.unpersist() ,甚至 result.checkpoint() .
最后,我希望在for循环中运行代码,以查找不同类型的多个结果 origin ,除非我能从以前的版本中找到释放RDD的方法,否则我会遇到内存问题。
更新:我想出了一个暴力方法来清除所有的vertexrdd和edgerdd

for ((k,v) <- spark.sparkContext.getPersistentRDDs) {
  val convertedToString = v.toString()
  if (convertedToString.contains("VertexRDD") || convertedToString.contains("EdgeRDD")) {
      v.unpersist()
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题