spark:java.io.filenotfoundexception:copymerge中不存在文件

mtb9vblg  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(479)

我正在尝试合并一个目录中的所有spark输出部件文件,并在scala中创建一个文件。
这是我的密码:

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

然后在最后一步,我写Dataframe输出如下。

dfMainOutputFinalWithoutNull.repartition(10).write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("header", "true")
  .option("codec", "gzip")
  .mode("overwrite")
  .save(outputfile)
  merge(mergeFindGlob, mergedFileName )
  dfMainOutputFinalWithoutNull.unpersist()

当我运行这个我得到以下例外

java.io.FileNotFoundException: File does not exist: hdfs:/user/zeppelin/FinancialLineItem/temp_FinancialLineItem
  at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)

这就是我如何得到我的输出

我想合并一个文件夹中的所有文件并创建一个文件,而不是文件夹。

yqlxgs2m

yqlxgs2m1#

hadoop 2中有一个copymerge api:https://hadoop.apache.org/docs/r2.7.1/api/src-html/org/apache/hadoop/fs/fileutil.html#line.382
不幸的是,在hadoop3.0中这将被弃用并删除。
以下是copymerge的实现(尽管是在pyspark中),我不得不这样写,因为我们找不到更好的解决方案:https://github.com/tagar/stuff/blob/master/copymerge.py
希望它也能帮助别人。

相关问题