heap空间错误和databricks上spark中的连接超时问题

o0lyfsai  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(180)

我正在azure databricks上运行spark作业(spark 3.0.1和scala 2.12)。我有3个工作节点,每个节点有20个内核和140 gb内存,驱动节点有3个内核和32 gb内存。我在spark submit中使用以下配置选项:

...
"--conf","spark.dynamicAllocation.enabled=false",
"--conf","spark.kryoserializer.buffer.max=512m",
"--conf","spark.driver.maxResultSize=20g",
"--driver-memory=20g",
"--conf","spark.executor.memory=120g",
"--conf","spark.executor.heartbeatInterval=300s",
"--conf","spark.storage.blockManagerSlaveTimeoutMs=300s",
"--conf","spark.network.timeout=500s"
...

从逻辑方面:我创建一个初始rdd,对它执行一些计算,并使用 flatMap ,缓存rdd,将其转换为Dataframe,将其保存为dbfs,然后执行 takeOrdered rdd上的操作(在 flatMap ). 我有 10^10 并行计算 2000 分区。这个 takeOrdered 任务失败并出现以下错误,如 Log4j output 章节:

21/04/02 13:10:46 INFO TaskSetManager: Starting task 27.1 in stage 1.0 (TID 2292, <worker2>, executor 14, partition 27, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 18.1 in stage 1.0 (TID 2293, <worker2>, executor 14, partition 18, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 45.1 in stage 1.0 (TID 2294, <worker2>, executor 14, partition 45, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 51.1 in stage 1.0 (TID 2295, <worker2>, executor 14, partition 51, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 2296, <worker2>, executor 14, partition 15, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 24.1 in stage 1.0 (TID 2297, <worker2>, executor 14, partition 24, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 33.1 in stage 1.0 (TID 2298, <worker2>, executor 14, partition 33, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 42.1 in stage 1.0 (TID 2299, <worker2>, executor 14, partition 42, NODE_LOCAL)
21/04/02 13:10:55 INFO BlockManagerInfo: Added rdd_1_1338 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.5 GiB)
21/04/02 13:10:57 WARN TransportChannelHandler: Exception in connection from /<worker2>:41478
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
21/04/02 13:11:02 INFO BlockManagerInfo: Added rdd_1_1339 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.5 GiB)
21/04/02 13:12:06 INFO BlockManagerInfo: Added rdd_1_1297 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.4 GiB)
21/04/02 13:12:20 INFO TaskSetManager: Starting task 1376.0 in stage 1.0 (TID 2300, <worker1>, executor 12, partition 1376, NODE_LOCAL)
21/04/02 13:12:20 WARN TaskSetManager: Lost task 1306.1 in stage 1.0 (TID 2248, <worker1>, executor 12): java.lang.OutOfMemoryError: Java heap space
    at com.google.common.collect.Ordering.leastOf(Ordering.java:678)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1502)
    at org.apache.spark.rdd.RDD$$Lambda$1305/103668203.apply(Unknown Source)
    at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2427)
    at org.apache.spark.SparkContext$$Lambda$1306/2078033457.apply(Unknown Source)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1129/1740040294.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

21/04/02 13:12:46 INFO TaskSetManager: Starting task 1306.2 in stage 1.0 (TID 2301, <worker1>, executor 12, partition 1306, NODE_LOCAL)
21/04/02 13:12:46 WARN TaskSetManager: Lost task 1299.1 in stage 1.0 (TID 2240, <worker1>, executor 12): java.lang.OutOfMemoryError: Java heap space
    at com.google.common.collect.Ordering.leastOf(Ordering.java:678)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1502)
    at org.apache.spark.rdd.RDD$$Lambda$1305/103668203.apply(Unknown Source)
    at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2427)
    at org.apache.spark.SparkContext$$Lambda$1306/2078033457.apply(Unknown Source)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1129/1740040294.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

21/04/02 13:12:46 INFO TaskSetManager: Starting task 1299.2 in stage 1.0 (TID 2302, <worker1>, executor 12, partition 1299, NODE_LOCAL)
21/04/02 13:12:46 INFO TaskSetManager: Lost task 1339.1 in stage 1.0 (TID 2259) on <worker1>, executor 12: java.lang.OutOfMemoryError (Java heap space) [duplicate 1]
21/04/02 13:12:49 INFO BlockManagerInfo: Added rdd_1_1303 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.4 GiB)
21/04/02 13:12:49 ERROR TaskSchedulerImpl: Lost executor 12 on <worker1>: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/02 13:12:49 WARN TaskSetManager: Lost task 1299.2 in stage 1.0 (TID 2302, <worker1>, executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/02 13:12:49 WARN TaskSetManager: Lost task 1340.1 in stage 1.0 (TID 2257, <worker1>, executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

在日志中,我可以看到 worker1 可用内存为: free: 63.8 GiB 基于以下日志行:

21/04/02 13:12:06 INFO BlockManagerInfo: Added rdd_1_1297 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.4 GiB)

所以我无法捕捉背后的原因 worker1 抛出堆空间问题。
而且,我也不明白背后的原因 worker2 引发连接超时错误。配置的网络超时和心跳间隔是否很小,或者可以实现其他优化?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题