检查hdfs路径中是否存在文件?

hxzsmxv2  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(752)

如何检查给定基本路径的文件是否存在。我为这个方法提供了一个文件列表,例如:file1.snappy,file2,snappy,。。。
我需要检查文件是否存在于给定路径中,例如: hdfs://a/b/c/source/file1.snappy 或者如果文件存在于 hdfs://a/b/c/target/file1.snappy . 如何更新/修改以下方法以接受 /a/b/c/target/ 或者 /a/b/c/source/ 作为基本路径并检查文件是否存在?如果存在于源中,则添加到源列表;如果存在于目标中,则添加到目标列表。

val fs = FileSystem.get(sprk.sparkContext.hadoopConfiguration)

  def fileExists(fileList:Array[String]) : Boolean = {
    var fileNotFound = 0
    fileList.foreach{
      file => {
        if(!fs.exists(new Path(file)))  fileNotFound+=1
        print("fileList",file)
      }
    }
    if(fileNotFound > 0) {
      println(fileNotFound + ": number of files not found probably moved")
      false
    }
    else
      true
  }
bvhaajcl

bvhaajcl1#

更新的工作代码 hdfs & s3 .
请检查下面的代码。

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._

// For converting to scala Iterator
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

import java.net.URI

def fs(path: String) = FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)

// Exiting paste mode, now interpreting.

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
fs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@640517de

示例目录

scala> "tree /tmp/examples".!
/tmp/examples

0 directories, 0 files

scala> "tree /tmp/sample".!
/tmp/sample
├── aaa
│   └── sample.json
├── bbb
│   └── bbb.json
├── ccc
└── ddd

4 directories, 2 files

结果

scala> List("/tmp/sample","/tmp/examples")
.flatMap(dir => {
    fs(dir)
    .listFiles(new Path(dir),true)
    .toList
    .filter(_.isFile)
    .map(d => (d.getPath.getParent,d.getPath))
// If you want only Boolean values, May be change above line to ```.map(d => (d.getPath.getParent,d.isFile))``` 
})
.foreach(println)

(/tmp/sample/bbb,file:/tmp/sample/bbb/bbb.json)
(/tmp/sample/aaa,file:/tmp/sample/aaa/sample.json)
t40tm48m

t40tm48m2#

我有一个源目录和目标是这样的下面的例子

尝试使用这种方法进行递归查找 URI.create(... ) 在处理s3对象时非常重要(也适用于hdfs/localfs)

import java.net.URI

import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
    * getAllFiles - get all files recursively from the sub folders.
    * 
    * @param path String
    * @param sc SparkContext
    * @return Seq[String]
    */
  def getAllFiles(path: String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(URI.create(path), conf)
    val files: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(path), true) // true for recursive lookup
    val buf = new ArrayBuffer[String]
    while (files.hasNext()) {
      val fileStatus = files.next();
      buf.append(fileStatus.getPath().toString)
    }
    buf.toSeq
  }

用法示例:

val spark: SparkSession = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate

  val sc = spark.sparkContext

  val myfiles: Seq[String] = getAllFiles("data/test_table", sc)
  myfiles.foreach(println)
  println(myfiles.contains("/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet"))

结果:

/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy.parquet
/data/test_table/target/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1111.parquet
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy11.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy111.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet

true

相关问题