graphx-缩放连接组件

uurv41yg  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(408)

我试图使用连接的组件,但有缩放问题。这就是我所拥有的-

// get vertices
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache

// get edges
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache

// create graph  
val identityGraph = Graph(vertices, edges)

// get connected components
val cc = identityGraph.connectedComponents.vertices

其中,graphutil具有用于返回顶点和边的辅助函数。此时,我的图有约100万个节点和约200万条边(顺便说一句,这将增长到约1亿个节点)。我的图是相当稀疏的连接-所以我期待大量的小图。
当我运行以上,我不断得到 java.lang.OutOfMemoryError: Java heap space . 我试过了 executor-memory 32g 并以45g为Yarn容器尺寸,运行15个节点的集群。
以下是例外详细信息:

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.lang.StringBuilder.toString(StringBuilder.java:405)
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360)
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216)
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

此外,我还收到了大量以下日志:

16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes

我的问题是有人尝试过这种规模的连接组件吗?如果是,我做错了什么?

slhcrj9b

slhcrj9b1#

正如我在上面的评论中所发布的,我使用spark上的map/reduce实现了连接组件。你可以在这里找到更多细节-https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar 麻省理工学院许可证下的源代码-https://github.com/kwartile/connected-component.

brgchamk

brgchamk2#

连通组件算法的伸缩性不是很好,其性能很大程度上取决于图的拓扑结构。你的边缘稀疏并不意味着你有小组件。一长串边非常稀疏(边数=顶点数-1),但graphx中实现的暴力算法效率不高(请参阅cc和pregel的源代码)。
以下是您可以尝试的内容(已排序,仅代码):
在Parquet地板(磁盘上)中检查顶点和边,然后再次加载它们以构建图形。当你的执行计划变得太大的时候,缓存有时并不能减少它。
以一种保持算法结果不变的方式变换图形。例如,您可以在代码中看到,algo在两个方向上传播信息(默认情况下应该是这样)。因此,如果有多条边连接相同的两个顶点,请将它们从应用算法的图形中过滤出来。
自己优化graphx代码(这非常简单),可以使用节省内存的通用优化(即每次迭代时在磁盘上设置检查点以避免oom),也可以使用特定于域的优化(类似于第2点)
如果您可以将graphx(这在某种程度上是遗留的)留在后面,那么可以考虑graphframes(包、博客)。我从没试过,所以我不知道它有没有cc。
我敢肯定你可以找到其他的Spark包的可能性,但也许你甚至会想使用一些以外的Spark。但这超出了问题的范围。
祝你好运!

相关问题