我遇到了一个关于spark提交进程无限期挂起的奇怪问题&在作业完成后泄漏内存。
在使用客户机模式提交到集群的前3个作业中,我一直有3个spark提交进程挂起。客户示例:
root 1517 0.3 4.7 8412728 1532876 ? Sl 18:49 0:38 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell
root 1746 0.4 3.5 8152640 1132420 ? Sl 18:59 0:36 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell
root 2239 65.3 7.8 9743456 2527236 ? Sl 19:10 91:30 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell
相应的作业在spark ui中显示为“已完成”,并且已关闭会话并根据日志退出。这些作业不再占用任何辅助资源&后续作业能够接收最大的执行器并按预期运行。然而,这3个进程一直以缓慢增长的速度消耗内存,这最终导致在尝试分配新驱动程序时出现oom。
上面进程1517的线程列表显示了以下用户线程(省略守护进程线程):
"Thread-4" #16 prio=5 os_prio=0 tid=0x00007f8fe4008000 nid=0x61c runnable [0x00007f9029227000]
java.lang.Thread.State: RUNNABLE
`at java.net.SocketInputStream.socketRead0(Native Method)`
`at java.net.SocketInputStream.socketRead(`[`SocketInputStream.java:116`](https://SocketInputStream.java:116)`)`
`at` [`java.net.SocketInputStream.read`](https://java.net.SocketInputStream.read)`(`[`SocketInputStream.java:171`](https://SocketInputStream.java:171)`)`
`at` [`java.net.SocketInputStream.read`](https://java.net.SocketInputStream.read)`(`[`SocketInputStream.java:141`](https://SocketInputStream.java:141)`)`
`at sun.nio.cs.StreamDecoder.readBytes(`[`StreamDecoder.java:284`](https://StreamDecoder.java:284)`)`
`at sun.nio.cs.StreamDecoder.implRead(`[`StreamDecoder.java:326`](https://StreamDecoder.java:326)`)`
`at` [`sun.nio.cs.StreamDecoder.read`](https://sun.nio.cs.StreamDecoder.read)`(`[`StreamDecoder.java:178`](https://StreamDecoder.java:178)`)`
`- locked <0x00000000800f8a88> (a java.io.InputStreamReader)`
`at` [`java.io.InputStreamReader.read`](https://java.io.InputStreamReader.read)`(`[`InputStreamReader.java:184`](https://InputStreamReader.java:184)`)`
`at java.io.BufferedReader.fill(`[`BufferedReader.java:161`](https://BufferedReader.java:161)`)`
`at java.io.BufferedReader.readLine(`[`BufferedReader.java:324`](https://BufferedReader.java:324)`)`
`- locked <0x00000000800f8a88> (a java.io.InputStreamReader)`
`at java.io.BufferedReader.readLine(`[`BufferedReader.java:389`](https://BufferedReader.java:389)`)`
`at` [`py4j.GatewayConnection.run`](https://py4j.GatewayConnection.run)`(`[`GatewayConnection.java:230`](https://GatewayConnection.java:230)`)`
`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:
`- None`
"Thread-3" #15 prio=5 os_prio=0 tid=0x00007f905dab7000 nid=0x61b runnable [0x00007f9029328000]
java.lang.Thread.State: RUNNABLE
`at java.net.PlainSocketImpl.socketAccept(Native Method)`
`at java.net.AbstractPlainSocketImpl.accept(`[`AbstractPlainSocketImpl.java:409`](https://AbstractPlainSocketImpl.java:409)`)`
`at java.net.ServerSocket.implAccept(`[`ServerSocket.java:560`](https://ServerSocket.java:560)`)`
`at java.net.ServerSocket.accept(`[`ServerSocket.java:528`](https://ServerSocket.java:528)`)`
`at` [`py4j.GatewayServer.run`](https://py4j.GatewayServer.run)`(`[`GatewayServer.java:685`](https://GatewayServer.java:685)`)`
`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:
`- None`
"pool-1-thread-1" #14 prio=5 os_prio=0 tid=0x00007f905daa5000 nid=0x61a waiting on condition [0x00007f902982c000]
java.lang.Thread.State: TIMED_WAITING (parking)
`at sun.misc.Unsafe.park(Native Method)`
`- parking to wait for <0x000000008011cda8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)`
`at java.util.concurrent.locks.LockSupport.parkNanos(`[`LockSupport.java:215`](https://LockSupport.java:215)`)`
`at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(`[`AbstractQueuedSynchronizer.java:2078`](https://AbstractQueuedSynchronizer.java:2078)`)`
`at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(`[`ScheduledThreadPoolExecutor.java:1093`](https://ScheduledThreadPoolExecutor.java:1093)`)`
`at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(`[`ScheduledThreadPoolExecutor.java:809`](https://ScheduledThreadPoolExecutor.java:809)`)`
`at java.util.concurrent.ThreadPoolExecutor.getTask(`[`ThreadPoolExecutor.java:1074`](https://ThreadPoolExecutor.java:1074)`)`
`at java.util.concurrent.ThreadPoolExecutor.runWorker(`[`ThreadPoolExecutor.java:1134`](https://ThreadPoolExecutor.java:1134)`)`
`at` [`java.util.concurrent.ThreadPoolExecutor$Worker.run`](https://java.util.concurrent.ThreadPoolExecutor$Worker.run)`(`[`ThreadPoolExecutor.java:624`](https://ThreadPoolExecutor.java:624)`)`
`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:
`- None`
"main" #1 prio=5 os_prio=0 tid=0x00007f905c016800 nid=0x604 runnable [0x00007f9062b96000]
java.lang.Thread.State: RUNNABLE
`at java.io.FileInputStream.readBytes(Native Method)`
`at` [`java.io.FileInputStream.read`](https://java.io.FileInputStream.read)`(`[`FileInputStream.java:255`](https://FileInputStream.java:255)`)`
`at java.io.BufferedInputStream.fill(`[`BufferedInputStream.java:246`](https://BufferedInputStream.java:246)`)`
`at` [`java.io.BufferedInputStream.read`](https://java.io.BufferedInputStream.read)`(`[`BufferedInputStream.java:265`](https://BufferedInputStream.java:265)`)`
`- locked <0x0000000080189dc8> (a java.io.BufferedInputStream)`
`at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:87)`
`at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)`
`at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)`
`at sun.reflect.NativeMethodAccessorImpl.invoke(`[`NativeMethodAccessorImpl.java:62`](https://NativeMethodAccessorImpl.java:62)`)`
`at sun.reflect.DelegatingMethodAccessorImpl.invoke(`[`DelegatingMethodAccessorImpl.java:43`](https://DelegatingMethodAccessorImpl.java:43)`)`
`at java.lang.reflect.Method.invoke(`[`Method.java:498`](https://Method.java:498)`)`
`at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)`
`at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)`
`at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)`
`at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)`
`at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)`
`at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)`
`at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)`
`at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)`
Locked ownable synchronizers:
`- None`
"VM Thread" os_prio=0 tid=0x00007f905c08c000 nid=0x60d runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f905c02b800 nid=0x605 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f905c02d000 nid=0x606 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f905c02f000 nid=0x607 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f905c030800 nid=0x608 runnable
"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f905c032800 nid=0x609 runnable
"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f905c034000 nid=0x60a runnable
"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f905c036000 nid=0x60b runnable
"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f905c037800 nid=0x60c runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f905c0e0800 nid=0x616 waiting on condition
我注意到主线程正在阻塞从pythongatewayserver读取的文件输入流&其余线程似乎在等待套接字读取时被阻塞。似乎出于某种原因保留了任意数量的python网关。
你知道是什么原因吗?
1条答案
按热度按时间qf9go6mv1#
正如@mazaneicha注意到的,spark提交进程在pyspark repl shell中运行。这是因为作业是从python运行时提交的,没有使用正确的spark提交脚本。
例如,考虑这个类:
事实证明,如果你示例化
BatchTask
打电话给我start_job
在pythonpath上安装了pyspark的机器上的任何python运行时中,spark提交进程都将在pyspark repl中运行并执行应用程序驱动程序。这个repl不会退出&将被重新用于以这种方式提交的后续作业,累积每个作业中添加的所有守护进程线程,最终耗尽驱动程序的内存限制并崩溃。解决方案-不要以这种方式提交spark应用程序,而是使用spark提交脚本。