fs.rename(新路径(rawfilename)、新路径(processfilename))不起作用

xcitsw88  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(347)

我正在研究基于scala的apachespark实现,用于将数据从远程位置加载到hdfs,然后将数据从hdfs摄取到hive表。
使用我的第一份spark工作,我将数据/文件安装到hdfs中的某个位置,比如-
hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/文件夹
让我们考虑一下,在安装了ct\u click \u basic.csv和ct\u click \u basic1.csv.gz文件之后,我在hdfs中有以下文件[共享位置的文件名将是此处的文件夹名,其内容将显示在-xx文件的一部分]:
[root@sandbox ~]#hdfs dfs-ls/data/analytics/raw/*/找到3项
-rw-r--r--3 chauhan.bhupesh hdfs 0 2017-07-27 15:02/data/analytics/raw/ct\u click\u basic.csv//u成功
-rw-r--r--3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02/data/analytics/raw/ct\u click\u basic.csv/part-00000
-rw-r--r--3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02/data/analytics/raw/ct\u click\u basic.csv/part-00001
找到2个项目
-rw-r--r--3 chauhan.bhupesh hdfs 0 2017-07-27 15:02/data/analytics/raw/ct\u click\u basic1.csv.gz//u成功
-rw-r--r--3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02/data/analytics/raw/ct\u click\u basic1.csv.gz/part-00000
现在使用我的另一个spark作业,我想根据每个阶段执行的任务,将这些文件从/raw文件夹移动到/process,最后移动到hdfs中的/archive文件夹。
为此,我首先使用以下代码获取/raw文件夹下所有文件的列表:

def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
           if(!fileStatus.isDirectory()) {
                filePaths+=fileStatus.getPath()      
            }
            else {
                listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
            }
        }
  }   
  filePaths
}

然后使用以下代码行,尝试将/raw文件夹中的文件重命名/移动到/process文件夹:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

但我不能移动/重命名这些文件使用上述编写的代码。我试着调试代码,发现fs.rename()返回“false”。
请注意:当我在/data/analytics/raw folder ex ct.csv[或任何其他文件]中手动复制任何文件,然后运行fs.rename()时,我能够实现文件重命名/移动,但对part-xx文件无效。
有什么我不知道的吗?
任何快速的帮助都将不胜感激。
你好,布比什

dbf7pr2w

dbf7pr2w1#

最后,我有问题了。实际上,我正试图将文件从/data/analytics/raw/folder.csv/part-xx重命名为/data/analytics/process/folder.csv/part-xx,其中hdfs中存在/data/analytics/process,但“folder.csv”不存在;因此它在重命名时返回false。我已经在我的代码中添加了以下行,对我来说效果很好

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size

   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)

   var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
   var processFolderPath = new Path(processFolderName)
   if(!(fs.exists(processFolderPath)))
         fs.mkdirs(processFolderPath)
   val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }
ddrv8njm

ddrv8njm2#

如果新路径(rawfilename)不存在,rename可以返回false。
在fs.rename之前,请检查文件是否存在:

if (fs.exists(somePath)) {
 fs.rename...
}

另一个原因可能是您试图重命名的文件被某人使用。或者,如果您尝试重命名目录,其中的一些文件可能会被某人使用。
要确保这是根本原因,请尝试重命名代码中的其他文件:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**"))
 }

如果此重命名成功,则根本原因确实处于竞争状态。

相关问题