以https://livebook.manning.com/book/spark-graphx-in-action/chapter-6/1
import org.apache.spark.graphx._
def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0.0 else Double.MaxValue,
List[VertexId]()))
for (i <- 1L to g.vertices.count-1) {
val currentVertexId =
g2.vertices.filter(!_._2._1)
.fold((0L,(false,Double.MaxValue,List[VertexId]())))((a,b) =>
if (a._2._2 < b._2._2) a else b)
._1
val newDistances = g2.aggregateMessages[(Double,List[VertexId])](
ctx => if (ctx.srcId == currentVertexId)
ctx.sendToDst((
ctx.srcAttr._2 + ctx.attr,
ctx.srcAttr._3 :+ ctx.srcId)),
(a,b) => if (a._1 < b._1) a else b)
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal =
newSum.getOrElse((Double.MaxValue,List[VertexId]()))
(vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2)})
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]())).productIterator.toList.tail))
}
val myVertices = spark.sparkContext.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"),
(4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges = spark.sparkContext.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val myGraph = Graph(myVertices, myEdges)
val result = dijkstra(myGraph, 1L)
result.vertices.map(_._2).collect
每次我运行这个代码 VertexRDD
留在记忆里,我无法释放它。
似乎graphx正在缓存图形数据,即使代码中没有指定它。是否可以从内存中释放上一次运行的rdd数据?
我试着通过这样做来消除持久性 result.unpersist()
, result.vertices.unpersist()
, result.edges.unpersist()
,甚至 result.checkpoint()
.
最后,我希望在for循环中运行代码,以查找不同类型的多个结果 origin
,除非我能从以前的版本中找到释放RDD的方法,否则我会遇到内存问题。
更新:我想出了一个暴力方法来清除所有的vertexrdd和edgerdd
for ((k,v) <- spark.sparkContext.getPersistentRDDs) {
val convertedToString = v.toString()
if (convertedToString.contains("VertexRDD") || convertedToString.contains("EdgeRDD")) {
v.unpersist()
}
}
暂无答案!
目前还没有任何答案,快来回答吧!