我使用以下命令在本地模式下使用spark 2.0调用pyspark:
pyspark --executor-memory 4g --driver-memory 4g
正在从tsv文件读取输入Dataframe,它有580 k x 28列。我在Dataframe上做了一些操作,然后我试图将它导出到一个tsv文件,我得到了这个错误。
df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')
任何指针如何摆脱这个错误。我可以很容易地显示df或计算行数。
输出Dataframe是3100行23列
错误:
Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:129)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.fetchNextRow(WindowExec.scala:300)
at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.<init>(WindowExec.scala:309)
at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:289)
at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:288)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
... 8 more
Driver stacktrace:
5条答案
按热度按时间qvtsj1bj1#
我的问题是
coalesce()
. 我所做的是导出文件不使用coalesce()
但是用Parquet地板代替df.write.parquet("testP")
. 然后读回文件并用coalesce(1)
.希望对你也有用。
ikfrs5lh2#
我认为这个问题的原因是coalesce(),尽管它避免了完全洗牌(就像重新分区一样),但它必须将数据压缩到请求的分区数。
在这里,您请求将所有数据放入一个分区,因此一个任务(并且只有一个任务)必须处理所有数据,这可能导致其容器受到内存限制。
所以,要么要求分区多于1个,要么避免
coalesce()
在这种情况下。否则,您可以尝试以下链接中提供的解决方案,以增加内存配置:
spark java.lang.outofmemoryerror:java堆空间
按键分组时,spark内存不足
cyej8jka3#
就我而言,更换
coalesce(1)
与repartition(1)
工作。olhwl3o24#
如其他答案所述,使用
repartition(1)
而不是coalesce(1)
. 原因是重分区(1)将确保上游处理并行完成(多个任务/分区),而不是只在一个执行器上完成。引用dataset.coalesce()spark文档:
但是,如果要进行剧烈合并(例如,使numpartitions=1),则这可能会导致计算在比您希望的节点更少的节点上进行(例如,在numpartitions=1的情况下为一个节点)。为了避免这种情况,可以调用重分区(1)。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
t2a7ltrp5#
就我而言,司机比工人小。通过使驱动器变大解决了这个问题。