stage只有一个正在运行的任务导致磁盘溢出

jhkqcmku  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(285)

在一个数据集上运行给定的sql之后,我们正在缓存它。代码的流程是
将数据集加载到sparkDataframe中
运行sql
使用重新划分结果数据集 dataset.repartition() 并使用选项缓存数据集 MEMORY_AND_DISK .
要运行的sql是:

`select *,CASE WHEN event_type in ('value1','value2') THEN EXISTING_COLUMN_1 ELSE '0' END AS NEW_COLUMN_NAME from events`

sql的结果是另一个Dataframe结果,我们使用它重新分区 result.repartion(300) 并使用persist函数对其进行缓存。参数 spark.sql.shuffle.partitions 设定为2001年。
现在,问题是,在缓存时,有三个阶段:[[ stages ][1]][1]
阶段1无序写入数据
第二阶段是洗牌。在这些阶段没有溢出到磁盘/内存
阶段3,其中只有一个任务正在运行并导致磁盘和内存溢出。这已经成为我们应用的瓶颈[ stage3 ][2]][2] [![在此处输入图像描述][3]][3]
为什么这个阶段只有一个任务,即将数据溢出到磁盘/内存?请帮助我们理解这个问题以及我们的错误所在。
编辑1:带有sql的函数\u sqltransformation.scala的代码段。

val events=spark.read.format("orc").load("path")
val table=spark.sql("""select *,
                     CASE 
                     WHEN event_type in ('value1','value2') THEN 
                     EXISTING_COLUMN_1 ELSE '0'
                     END AS NEW_COLUMN_NAME from events""".stripMargin)
table = table.repartition(300)
table.createOrReplaceTempView("second_table_name")
table.persist(StorageLevel.MEMORY_AND_DISK)

  [1]: https://i.stack.imgur.com/RcAZt.png
  [2]: https://i.stack.imgur.com/l723E.png
  [3]: https://i.stack.imgur.com/XUE3G.png

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题