我的pyspark作业(spark2.4.1)似乎在10%的情况下运行良好,而其他时间似乎永远停留在一项任务上,我无法真正理解正在发生的事情。下面是我在pyspark代码中所做的:
df = ss.read.parquet(...)
df2 = df.withColumn("A", my_python_udf(sf.col("position.latitude"))
print(df2.groupBy(sf.spark_partition_id()).count().agg(sf.min("count"), sf.max("count"), sf.avg("count")).toPandas())
我似乎永远被困在“托潘达斯”的电话评价。当我检查executors选项卡时,只有一个executor可以使用以下调用堆栈运行:
java.net.socketinputstream.socketread0(本机方法)java.net.socketinputstream.socketread(socketinputstream)。java:116)java.net.socketinputstream.read(socketinputstream。java:171)java.net.socketinputstream.read(socketinputstream。java:141)java.io.bufferedinputstream.read1(bufferedinputstream。java:284) java.io.bufferedinputstream.read(bufferedinputstream。java:345)=>保持监视器(java.io。bufferedinputstream@1118259716})java.io.datainputstream.readfully(datainputstream。java:195)java.io.datainputstream.readfully(datainputstream。java:169)org.apache.spark.sql.execution.pythonudfrunner$$anon$1.read(pythonudfrunner。scala:74) org.apache.spark.sql.execution.pythonudfrunner$$anon$1.read(pythonudfrunner。scala:64)org.apache.spark.api.python.basepythonrunner$readeriterator.hasnext(pythonrunner。scala:406)org.apache.spark.interruptibleiterator.hasnext(interruptibleiterator。scala:37)scala.collection.iterator$$anon$12.hasnext(迭代器。scala:440) scala.collection.iterator$$anon$11.hasnext(迭代器。scala:409)scala.collection.iterator$$anon$11.hasnext(迭代器。scala:409)org.apache.spark.sql.catalyst.expressions.generatedclass$generateditorForCodeGenStage2.agg\u doaggregatewithkeys\u 0$(未知源代码)org.apache.spark.sql.catalyst.expressions.generatedclass$generateEditorForCodeGenStage2.processnext(未知源)org.apache.spark.sql.execution.bufferedrowiterator.hasnext(bufferedrowiterator)。java:43)org.apache.spark.sql.execution.whitestagecodegenexec$$anonfun$13$$anon$1.hasnext(whitestagecodegenexec。scala:636) scala.collection.iterator$$anon$11.hasnext(迭代器。scala:409)org.apache.spark.shuffle.sort.unsafeshufflewriter.write(unsafeshufflewriter。java:187)org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:99)org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:55) org.apache.spark.scheduler.task.run(任务。scala:121)org.apache.spark.executor.executor$taskrunner$$anonfun$10.apply(executor。scala:403)org.apache.spark.util.utils$.trywithsafefinally(utils。scala:1360)org.apache.spark.executor.executor$taskrunner.run(executor。scala:409) java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)java.lang.thread.run(线程。java:748)
我有几个问题:
为什么调用堆栈似乎在做一些与udf evaluatin相关的事情,这在我的计算中是不需要的?
到底发生了什么?我不确定那个线程是死锁的还是在调用堆栈中活动的
如何解决这个问题?
编辑:我还有两个执行器失败,错误如下:
java.io.ioexception:在net.razorvine.pickle.pickleutils.readbytes\u into(pickleutils)的输入流中需要更多字节。java:75)位于net.razorvine.pickle.pickleutils.readbytes(pickleutils。java:55)在net.razorvine.pickle.unpickler.load\u binunicode(unpickler。java:473)在net.razorvine.pickle.unpickler.dispatch(unpickler。java:190)在net.razorvine.pickle.unpickler.load(unpickler。java:99)在net.razorvine.pickle.unpickler.load(unpickler。java:112)在org.apache.spark.sql.execution.python.batchevalpythonexec$$anonfun$evaluate$1.apply(batchevalpythonexec。scala:90)在org.apache.spark.sql.execution.python.batchevalpythonexec$$anonfun$evaluate$1.apply(batchevalpythonexec。scala:89) 在scala.collection.iterator$$anon$12.nextcur(iterator。scala:435)在scala.collection.iterator$$anon$12.hasnext(iterator。scala:441)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:409)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:409)在org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage2.agg\u doaggregatewithkeys\u 0$(未知源)位于org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage2.processnext(未知源)位于org.apache.spark.sql.execution.bufferedrowterator.hasnext(Bufferedrowterator)。java:43)在org.apache.spark.sql.execution.whitestagecodegenexec$$anonfun$13$$anon$1.hasnext(whitestagecodegenexec。scala:636)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:409)在org.apache.spark.shuffle.sort.unsafeshufflewriter.write(unsafeshufflewriter。java:187)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:99)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:55)在org.apache.spark.scheduler.task.run(task。scala:121)在org.apache.spark.executor.executor$taskrunner$$anonfun$10.apply(executor。scala:403)在org.apache.spark.util.utils$.trywithsafefinally(utils。scala:1360)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:409)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
这让我相信代码之外的东西出了问题
1条答案
按热度按时间eeq64g8w1#
就我而言,我的Pypark工作总是(不是随机的)陷入困境
PythonUDFRunner
正在等待数据。我发现它正在等待来自pyspark守护进程的数据,pyspark守护进程用于启动python代码(python udf),而不会浪费大量内存,同时从java进程(spark)派生python进程(udf)更多信息,请参阅以下链接:pyspark守护进程
因此,粗略地说,python udf由spark序列化并发送给这个守护进程,这个守护进程负责运行python代码。
这个守护进程通过一个名为
pyspark.zip
在我的情况下,因为我不想说的原因;);)这个pyspark.zip
来自spark 2.3.3,但spark本身与spark 2.4.5一起运行。我替换了pyspark.zip
一个来自spark 2.4.5,一切都开始成功运行。我不认为你会有同样的问题,因为我有,但也许它可以给你一些想法,正在发生什么事,在你的设置。