第二次在avro中保存Dataframe时出现以下错误。如果我在保存后删除sub_folder/part-00000--c000.avro,然后尝试保存相同的数据集,我会得到以下结果:
FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
如果我不仅从 sub_folder
,但也来自 main_folder
,那么问题就不会发生,但我负担不起。
当试图以任何其他格式保存数据集时,问题实际上不会发生。
保存空数据集不会导致错误。
该示例建议需要刷新表,但作为 sparkSession.catalog.listTables().show()
没有要刷新的表。
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+
先前保存的Dataframe如下所示。应用程序应该更新它:
+--------------------+--------------------+
| Col1 | Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+
对我来说,这是一个明显的缓存问题。但是,清除缓存的所有尝试都失败:
dataset.write
.format("avro")
.option("path", path)
.mode(SaveMode.Overwrite) // Any save mode gives the same error
.save()
// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()
// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist()
我是这样读取数据集的:
private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {
val df = sparkSession.read
.format("avro")
.load(path)
.select("*")
df.as[T]
}
最后,堆栈跟踪是这个。非常感谢你的帮助!:
ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
... 10 more
2条答案
按热度按时间q3qa4bjr1#
非常感谢ram ghadiyaram!
解决方案2解决了我的问题,但只在我的本地ubuntu。当我在hdfs中测试时,问题仍然存在。
解决方案1是确定的解决方案。我的代码现在是这样的:
有人说:我确实查过那篇文章,试图解决这个问题,但没有成功。我放弃了这是我的问题,原因如下:
我在“main\u folder”下创建了一个名为“sub\u folder\u temp”的/temp,保存仍然失败。
将同一个非空数据集保存在同一路径中,但保存为json格式,实际上不需要这里讨论的解决方法。
在同一路径中保存具有相同类型[t]的空数据集实际上是可行的,没有这里讨论的解决方法。
vxqlmq5t2#
下面的错误信息将导致错误。但实际问题是从同一位置读/写。
我给另一个例子,而不是你(用Parquet在你的情况下avro)。
我给你两个选择。
方案1(
cache
以及show
工作原理如下:方案2:
1) 使用不同的avro文件夹保存Dataframe。
2) 删除旧的avro文件夹。
3) 最后重命名这个新创建的
avro
文件夹的旧名称,将工作。