我正在azure databricks上运行spark作业(spark 3.0.1和scala 2.12)。我有3个工作节点,每个节点有20个内核和140 gb内存,驱动节点有3个内核和32 gb内存。我在spark submit中使用以下配置选项:
...
"--conf","spark.dynamicAllocation.enabled=false",
"--conf","spark.kryoserializer.buffer.max=512m",
"--conf","spark.driver.maxResultSize=20g",
"--driver-memory=20g",
"--conf","spark.executor.memory=120g",
"--conf","spark.executor.heartbeatInterval=300s",
"--conf","spark.storage.blockManagerSlaveTimeoutMs=300s",
"--conf","spark.network.timeout=500s"
...
从逻辑方面:我创建一个初始rdd,对它执行一些计算,并使用 flatMap
,缓存rdd,将其转换为Dataframe,将其保存为dbfs,然后执行 takeOrdered
rdd上的操作(在 flatMap
). 我有 10^10
并行计算 2000
分区。这个 takeOrdered
任务失败并出现以下错误,如 Log4j output
章节:
21/04/02 13:10:46 INFO TaskSetManager: Starting task 27.1 in stage 1.0 (TID 2292, <worker2>, executor 14, partition 27, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 18.1 in stage 1.0 (TID 2293, <worker2>, executor 14, partition 18, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 45.1 in stage 1.0 (TID 2294, <worker2>, executor 14, partition 45, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 51.1 in stage 1.0 (TID 2295, <worker2>, executor 14, partition 51, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 2296, <worker2>, executor 14, partition 15, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 24.1 in stage 1.0 (TID 2297, <worker2>, executor 14, partition 24, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 33.1 in stage 1.0 (TID 2298, <worker2>, executor 14, partition 33, NODE_LOCAL)
21/04/02 13:10:46 INFO TaskSetManager: Starting task 42.1 in stage 1.0 (TID 2299, <worker2>, executor 14, partition 42, NODE_LOCAL)
21/04/02 13:10:55 INFO BlockManagerInfo: Added rdd_1_1338 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.5 GiB)
21/04/02 13:10:57 WARN TransportChannelHandler: Exception in connection from /<worker2>:41478
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
21/04/02 13:11:02 INFO BlockManagerInfo: Added rdd_1_1339 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.5 GiB)
21/04/02 13:12:06 INFO BlockManagerInfo: Added rdd_1_1297 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.4 GiB)
21/04/02 13:12:20 INFO TaskSetManager: Starting task 1376.0 in stage 1.0 (TID 2300, <worker1>, executor 12, partition 1376, NODE_LOCAL)
21/04/02 13:12:20 WARN TaskSetManager: Lost task 1306.1 in stage 1.0 (TID 2248, <worker1>, executor 12): java.lang.OutOfMemoryError: Java heap space
at com.google.common.collect.Ordering.leastOf(Ordering.java:678)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1502)
at org.apache.spark.rdd.RDD$$Lambda$1305/103668203.apply(Unknown Source)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2427)
at org.apache.spark.SparkContext$$Lambda$1306/2078033457.apply(Unknown Source)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1129/1740040294.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/04/02 13:12:46 INFO TaskSetManager: Starting task 1306.2 in stage 1.0 (TID 2301, <worker1>, executor 12, partition 1306, NODE_LOCAL)
21/04/02 13:12:46 WARN TaskSetManager: Lost task 1299.1 in stage 1.0 (TID 2240, <worker1>, executor 12): java.lang.OutOfMemoryError: Java heap space
at com.google.common.collect.Ordering.leastOf(Ordering.java:678)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1502)
at org.apache.spark.rdd.RDD$$Lambda$1305/103668203.apply(Unknown Source)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2427)
at org.apache.spark.SparkContext$$Lambda$1306/2078033457.apply(Unknown Source)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1129/1740040294.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/04/02 13:12:46 INFO TaskSetManager: Starting task 1299.2 in stage 1.0 (TID 2302, <worker1>, executor 12, partition 1299, NODE_LOCAL)
21/04/02 13:12:46 INFO TaskSetManager: Lost task 1339.1 in stage 1.0 (TID 2259) on <worker1>, executor 12: java.lang.OutOfMemoryError (Java heap space) [duplicate 1]
21/04/02 13:12:49 INFO BlockManagerInfo: Added rdd_1_1303 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.4 GiB)
21/04/02 13:12:49 ERROR TaskSchedulerImpl: Lost executor 12 on <worker1>: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/02 13:12:49 WARN TaskSetManager: Lost task 1299.2 in stage 1.0 (TID 2302, <worker1>, executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/02 13:12:49 WARN TaskSetManager: Lost task 1340.1 in stage 1.0 (TID 2257, <worker1>, executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
在日志中,我可以看到 worker1
可用内存为: free: 63.8 GiB
基于以下日志行:
21/04/02 13:12:06 INFO BlockManagerInfo: Added rdd_1_1297 in memory on <worker1>:38513 (size: 38.1 MiB, free: 63.4 GiB)
所以我无法捕捉背后的原因 worker1
抛出堆空间问题。
而且,我也不明白背后的原因 worker2
引发连接超时错误。配置的网络超时和心跳间隔是否很小,或者可以实现其他优化?
暂无答案!
目前还没有任何答案,快来回答吧!