Apache Spark 覆盖S3上的 Dataframe 时引发FileNotFoundException异常

2nc8po8w  于 2023-03-09  发布在  Apache
关注(0)|答案(3)|浏览(123)

我已经将存储在S3上两个位置的 parquet 文件分区到同一存储桶中:

path1: s3n://bucket/a/
path2: s3n://bucket/b/

数据具有相同的结构,我想从第一个位置读取文件,并使用spark sql将它们聚合到第二个位置,下面是代码片段:

val df1 = sql.read.parquet(path1)
val df2 = sql.read.parquet(path2)

val df = df1.unionAll(df2)

df.write.mode(SaveMode.Overwrite).parquet(path1)

当我运行这段代码时,我得到了以下异常:

java.io.FileNotFoundException: No such file or directory 
s3n://a/part-r-00001-file.gz.parquet

我使用的是spark 1.6.1和scala 2.11。

myzjeezk

myzjeezk1#

我没有找到这个问题的直接解决方案,所以我使用了一个变通方案:

val df2 = sql.read.parquet(path2)
df2.write.mode(SaveMode.Append).parquet(path1)

val df1 = sql.read.parquet(path1)
df1.write.mode(SaveMode.Overwrite).parquet(path1)
yduiuuwa

yduiuuwa2#

在上述情况下,由于读取和写入路径相同,一致性模型变为最终一致性。给予个例子,在您的代码中,数据从相同的路径1读取,最终数据也是到路径1,它最初读取数据,但当spark开始写入数据时,正在读取的数据被删除,因此会出现java.io.FileNotFound异常。因此,如果可能,您应该使用不同的路径写入最终输出。
这种读写甚至被认为是spark sql中的错误,当你从一个表中读取一个 Dataframe 并写入同一个表时,spark会抛出一个错误,告诉你不允许读取和写入同一个位置。

idfiyjo8

idfiyjo83#

建议使用Spark、Yarn、分段方向

private fun read(
        target: String,
        extension: Extension,
        repartition: Int? = 1,
    ): Dataset<Row> {
        return SparkUtil.make()
            .read()
            .format(extension.name)
            .load(target)
            .repartition(repartition!!)
    }

    private fun merge(
        target: String,
        extension: Extension,
        repartition: Int? = 1,
    ) {
        val ds = read(target, extension, repartition)

        val dir = SparkUtil.conf("spark.yarn.stagingDir") // stagingDir
        val id = SparkUtil.id() // applicationId
        val temp = "$dir/$id"

        ds.write()
            .format(extension.name)
            .mode(SaveMode.Overwrite)
            .save(temp)

        read(temp, extension, repartition)
            .write()
            .format(extension.name)
            .mode(SaveMode.Overwrite)
            .save(target)
    }

相关问题