由于堆外错误,pyspark可以做任何事情,但不能保存结果

f87krz0w  于 2023-04-07  发布在  Apache
关注(0)|答案(1)|浏览(183)

这是我第一次有足够大的数据集,我不能再使用我的常规工具了。所以我深入研究了Spark,但我不明白为什么我会耗尽堆来保存一个简单的csv文件。
我有一个这样的管道,它工作得很好:

spark.read\
    .format("parquet")\
    .load('/mnt/data/share/parquets/quotes-*.parquet')\
    .withColumn("symbol", SQL.regexp_extract('filename', r'\/([^\/]*).csv$', 1))\
    .withColumn("DDate", col("Date").cast(DateType()))\
    .select(["symbol", "DDate", "value"])\
    .filter(col("DDate") >= effective_date)\
    .groupBy("symbol").pivot("DDate").sum("value")\
    .count()

>>> 325073

但是,当我将最后一行从count()更改为.repartition(1).write.csv("/tmp/test.csv")时,我得到了一个堆外错误。

23/04/05 22:30:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[148.576s][warning][gc,alloc] Executor task launch worker for task 12.0 in stage 10.0 (TID 662): Retried waiting for GCLocker too often allocating 131074 words
23/04/05 22:30:27 ERROR Executor: Exception in task 12.0 in stage 10.0 (TID 662)
java.lang.OutOfMemoryError: Java heap space
23/04/05 22:30:27 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 12.0 in stage 10.0 (TID 662),5,main]
java.lang.OutOfMemoryError: Java heap space
23/04/05 22:30:27 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@4fefe566 rejected from java.util.concurrent.ThreadPoolExecutor@2558895b[Shutting down, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 656]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

我不明白的是,一个csv文件应该能够逐行写入,我怀疑行这么大。我使用以下设置:

spark.driver.memory              4g
spark.executor.memory            12g

增加或减少分区的数量(甚至省略repartition(..))也不会改变任何东西。

pqwbnv8z

pqwbnv8z1#

在您的应用程序中,验证是否有任何数据溢出正在发生在任何阶段。如果是,尝试增加 Shuffle 分区或增加执行器内存。在应用这两个问题仍然存在,那么它是你面临的数据歪斜问题。尝试使用各种技术解决数据歪斜问题,如盐,广播等。

相关问题