写入hdfs路径时出错java.io.ioexception:无法重命名

yyyllmsg  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(699)

我使用的是spark-sql-2.4.1v,它使用的是hadoop-2.6.5.jar版本。我需要先在hdfs上保存数据,然后再转到cassandra。因此,我尝试将hdfs上的数据保存如下:

String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");

givenItemList.parallelStream().forEach( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    Dataset<Row> resultDs = sparkSession.sql(query);

    saveDsToHdfs(hdfsPath, resultDs );   
});

public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
    df.write()                                 
      .format("parquet")
      .mode("append")
      .save(parquet_file);
    logger.info(" Saved parquet file :   " + parquet_file + "successfully");
}

当我在群集上运行作业时,它会引发以下错误:

java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)

请建议如何解决此问题?

xmq68pz9

xmq68pz91#

错误是您试图将Dataframe写入givenitemlist集合中每个项的相同位置。通常,如果这样做,就会产生错误
outputdirectory已存在
但是由于foreach函数将执行并行线程中的所有项,因此会出现此错误

givenItemList.parallelStream().forEach( item -> {   
String query = "select $item  as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s_item",hdfsPath), resultDs );

});
或者你也可以在hdfspath下有这样的子目录

givenItemList.parallelStream().forEach( item -> {   
String query = "select $item  as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);

saveDsToHdfs(Strin.format("%s/item",hdfsPath), resultDs );

}); `

57hvy0tb

57hvy0tb2#

您可以在一个作业中执行所有选择,在一个表中获取所有选择和联合。

Dataset<Row> resultDs = givenItemList.parallelStream().map( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    return sparkSession.sql(query);
}).reduce((a, b) -> a.union(b)).get

saveDsToHdfs(hdfsPath, resultDs );

相关问题