我在mapr集群中有两个位置,我的spark工作是从这两个端点加载数据。其中一个端点拥有大量数据,而另一个端点则相对较少。现在,当我像一个 .reduceByKey
或者 .groupByKey
出现异常:
"java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
at com.esotericsoftware.kryo.io.Output.close(Output.java:196)
at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:255)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:293)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1368)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:292)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:810)
at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"
现在,如果我将数据从一个位置复制到另一个位置,然后执行shuffle操作,则不会出现任何超时异常。为什么会有这样的行为?根据我的理解,洗牌操作将发生在rdd上,所以不管它从n个位置读取数据,它的行为应该是相似的。
如果我的理解有误,请纠正我。
1条答案
按热度按时间uurv41yg1#
这个问题的某些方面相当令人困惑。
首先,您指的是在mapr集群中的“2个位置”中拥有数据。你是说两个目录中的数据吗?或者你真的是说两个地方?或者你的意思是你实际上有两个簇?
另一个让人困惑的地方是,您显示了内存不足问题的堆栈跟踪,但随后会讨论某种类型的超时。你到底有什么问题?
一般来说,内存不足异常不太依赖于数据源,但与资源的过度投入有很大关系。如果你有很多任务使用的内存或cpu比它们少很多,但是你可能有一些任务使用的更多,那么过度承诺是有用的。过度投入资源可以让您安排此类程序,但如果多个大任务落在同一台机器上,则可能会导致严重问题。这也会导致一些随机行为,因此如果你不小心验证你的理论,你很容易跳到错误的结论。
在Yarn下,通常依赖于过度使用。如果你在kubernetes上运行,通常会有更高的精度。
不幸的是,在这个问题上没有更多的细节/一致性,这样的概括是最好的。