filenotfoundexception:spark保存失败无法从数据集[t]avro清除缓存

rdrgkggo  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(747)

第二次在avro中保存Dataframe时出现以下错误。如果我在保存后删除sub_folder/part-00000--c000.avro,然后尝试保存相同的数据集,我会得到以下结果:

FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

如果我不仅从 sub_folder ,但也来自 main_folder ,那么问题就不会发生,但我负担不起。
当试图以任何其他格式保存数据集时,问题实际上不会发生。
保存空数据集不会导致错误。
该示例建议需要刷新表,但作为 sparkSession.catalog.listTables().show() 没有要刷新的表。

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

先前保存的Dataframe如下所示。应用程序应该更新它:

+--------------------+--------------------+
|              Col1  |               Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+

对我来说,这是一个明显的缓存问题。但是,清除缓存的所有尝试都失败:

dataset.write
      .format("avro")
      .option("path", path)
      .mode(SaveMode.Overwrite) // Any save mode gives the same error
      .save()

// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()  

// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist()

我是这样读取数据集的:

private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {

    val df = sparkSession.read
      .format("avro")
      .load(path)
      .select("*")

    df.as[T]
  }

最后,堆栈跟踪是这个。非常感谢你的帮助!:

ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 10 more
q3qa4bjr

q3qa4bjr1#

非常感谢ram ghadiyaram!
解决方案2解决了我的问题,但只在我的本地ubuntu。当我在hdfs中测试时,问题仍然存在。
解决方案1是确定的解决方案。我的代码现在是这样的:

private def doWriteToPath[T <: Product with Serializable: TypeTag: ClassTag](dataset: Dataset[T], path: String): Unit = {

// clear any previously cached avro
sparkSession.catalog.clearCache()

// update the cache for this particular dataset, and trigger an action
dataset.cache().show(1)

dataset.write
  .format("avro")
  .option("path", path)
  .mode(SaveMode.Overwrite)
  .save()    
}

有人说:我确实查过那篇文章,试图解决这个问题,但没有成功。我放弃了这是我的问题,原因如下:
我在“main\u folder”下创建了一个名为“sub\u folder\u temp”的/temp,保存仍然失败。
将同一个非空数据集保存在同一路径中,但保存为json格式,实际上不需要这里讨论的解决方法。
在同一路径中保存具有相同类型[t]的空数据集实际上是可行的,没有这里讨论的解决方法。

vxqlmq5t

vxqlmq5t2#

  • 从同一个地方读,在同一个地方写,就会产生这个问题。这次论坛也讨论了这个问题。还有我的答案*

下面的错误信息将导致错误。但实际问题是从同一位置读/写。

You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL

我给另一个例子,而不是你(用Parquet在你的情况下avro)。
我给你两个选择。
方案1( cache 以及 show 工作原理如下:

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("Rod saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

方案2:
1) 使用不同的avro文件夹保存Dataframe。
2) 删除旧的avro文件夹。
3) 最后重命名这个新创建的 avro 文件夹的旧名称,将工作。

相关问题