获取保存Parquet文件的默认hdfs路径

h9a6wy2h  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(410)

我运行了一个spark作业,最终保存了一个Parquet文件,该作业成功完成。但是,我只指定了文件名,没有指定hdfs路径。有没有办法打印出spark将文件写入的默认hdfs路径?我看着 sc._conf.getAll() ,但那里似乎没有什么有用的东西。

5q4ezhmt

5q4ezhmt1#

这是一种方式(除了简单的命令方式之外) hadoop fs -ls -R | grep -i yourfile )....
下面是示例scala代码段(如果您想在python或java中实现,您可以模拟相同的api调用(api调用)来获取Parquet文件列表。像下面这样过滤它们。。。。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, Text}
import org.apache.spark.{SparkConf, SparkContext}
//other imports here 
lazy val sparkConf = new SparkConf()    
 lazy val sc = SparkContext.getOrCreate(sparkConf)   
 lazy val fileSystem = FileSystem.get(sc.hadoopConfiguration)
    val fileSystem = listChaildStatuses(fileSystem , new Path("yourbasepathofHDFS")) // normally hdfs://server/user like this...
  val allparquet = fileSystem.filter(_.getPath.getName.endsWith(".parquet"))
// now you can print these parquet files out of which your files will be present and you can know the base path...

支撑方法如下

/**
        * Get [[org.apache.hadoop.fs.FileStatus]] objects for all Chaild children (files) under the given base path. If the
        * given path points to a file, return a single-element collection containing [[org.apache.hadoop.fs.FileStatus]] of
        * that file.
        */
     def listChaildStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
        listChaildStatuses(fs, fs.getFileStatus(basePath))
    }

 /**
    * Get [[FileStatus]] objects for all Chaild children (files) under the given base path. If the
    * given path points to a file, return a single-element collection containing [[FileStatus]] of
    * that file.
    */
  def listChaildStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
    def recurse(status: FileStatus): Seq[FileStatus] = {
      val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
      leaves ++ directories.flatMap(f => listChaildStatuses(fs, f))
    }

    if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
  }

相关问题