databricks sparkeexception超过spark.driver.maxresultsize

neskvpey  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(570)

我正在azure databricks dbr 7.3 lts、spark 3.0.1、scala 2.12上运行以下代码,这些代码是在标准\u e4as \u v4(32.0 gb内存,4核,1 dbu)虚拟机和标准\u ds5 \u v2(56.0 gb内存,16核,3 dbu)类型驱动程序的(20到35)个工作线程集群上运行的
其目的是处理约5.5 tb的数据
我面临以下异常:“org.apache.spark.sparkexception:由于阶段失败而中止作业:在处理57071个任务中的1163个任务(148.4 gib)后,1165个任务(4.0 gib)的序列化结果的总大小大于spark.driver.maxresultsize 4.0 gib”,处理数据的时间为6.1分钟
我不收集或传输数据到驱动程序,分区数据会导致这个问题吗?如果是这样:
有更好的分割方法吗?
如何解决这个问题?
代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)

val jsonDF = spark.read.json("/mnt/myfile")

val res = jsonDF
      .withColumn("row", row_number.over(w))
      .where($"row" === 1)
      .drop("row")

res.write.json("/mnt/myfile/spark_output")

然后,我只尝试在没有转换的情况下再次加载和写入数据,并面临同样的问题,代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val jsonDF = spark.read.json("/mnt/myfile")

jsonDF.write.json("/mnt/myfile/spark_output")
irlmq6kh

irlmq6kh1#

这个 write 方法将所有分区的写入操作的结果发送回驱动程序,由于发生了大量数据(和许多分区),请尝试增加 spark.driver.maxResultSize 看看它的效果。
根据文件:
每个spark操作(例如collect)的所有分区的序列化结果的总大小限制(字节)。应至少为1m,或0表示无限。如果总大小超过此限制,作业将被中止。上限可能会导致驱动程序内存不足错误(取决于spark.driver.memory和jvm中对象的内存开销)。设置适当的限制可以保护驱动程序免受内存不足错误的影响。
这也很有用。

相关问题