这是我第一次有足够大的数据集,我不能再使用我的常规工具了。所以我深入研究了Spark,但我不明白为什么我会耗尽堆来保存一个简单的csv文件。
我有一个这样的管道,它工作得很好:
spark.read\
.format("parquet")\
.load('/mnt/data/share/parquets/quotes-*.parquet')\
.withColumn("symbol", SQL.regexp_extract('filename', r'\/([^\/]*).csv$', 1))\
.withColumn("DDate", col("Date").cast(DateType()))\
.select(["symbol", "DDate", "value"])\
.filter(col("DDate") >= effective_date)\
.groupBy("symbol").pivot("DDate").sum("value")\
.count()
>>> 325073
但是,当我将最后一行从count()
更改为.repartition(1).write.csv("/tmp/test.csv")
时,我得到了一个堆外错误。
23/04/05 22:30:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[148.576s][warning][gc,alloc] Executor task launch worker for task 12.0 in stage 10.0 (TID 662): Retried waiting for GCLocker too often allocating 131074 words
23/04/05 22:30:27 ERROR Executor: Exception in task 12.0 in stage 10.0 (TID 662)
java.lang.OutOfMemoryError: Java heap space
23/04/05 22:30:27 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 12.0 in stage 10.0 (TID 662),5,main]
java.lang.OutOfMemoryError: Java heap space
23/04/05 22:30:27 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@4fefe566 rejected from java.util.concurrent.ThreadPoolExecutor@2558895b[Shutting down, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 656]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我不明白的是,一个csv文件应该能够逐行写入,我怀疑行这么大。我使用以下设置:
spark.driver.memory 4g
spark.executor.memory 12g
增加或减少分区的数量(甚至省略repartition(..)
)也不会改变任何东西。
1条答案
按热度按时间pqwbnv8z1#
在您的应用程序中,验证是否有任何数据溢出正在发生在任何阶段。如果是,尝试增加 Shuffle 分区或增加执行器内存。在应用这两个问题仍然存在,那么它是你面临的数据歪斜问题。尝试使用各种技术解决数据歪斜问题,如盐,广播等。