list/retrieve hdfs partitions as map(string,list(string))from spark

7lrncoxx  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(324)

我想知道是否有某种方法可以利用spark中hdfs文件夹结构中已经存在的元数据信息。例如,我使用以下代码将数据写入hdfs,

val columns = Seq("country","state")
dataframe1.write.partitionBy(columns:_*).mode("overwrite").
save(path)

这会生成类似的目录结构,

path/country=xyz/state=1
path/country=xyz/state=2
path/country=xyz/state=3
path/country=abc/state=4

我想知道的是使用spark,有没有一种方法可以将所有的分区和子分区作为 Map(String,List(String)) (不加载整个文件并使用groupby?),其中键是分区,值是该分区内所有子分区的列表。
上述示例的输出类似于以下内容

Map(xyz->List(1,2,3),abc->(4))
vqlkdk9b

vqlkdk9b1#

你的hdfs文件结构是这样的。。。

$tree path
path
├── country=abc
│   └── state=4
└── country=xyz
    ├── state=1
    ├── state=2
    ├── state=3
    ├── state=4
    ├── state=5
    └── state=6

您需要使用它来获取字符串形式的完整路径。。

val lb = new ListBuffer[String]
  def getAllFiles(path:String, sc: SparkContext):Unit = {
  val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    val files: RemoteIterator[LocatedFileStatus] = fs.listLocatedStatus(new Path(path))
    while(files.hasNext) {
      var filepath = files.next.getPath.toString
      //println(filepath)
      lb += (filepath)
      getAllFiles(filepath, sc)
    }
    println(lb)
  }

一旦你得到listbuffer所有文件的完整路径,包括子文件夹。。。您需要编写逻辑来填充Map。我要把它留给你。蒂伊。。
注意:listbuffer有一个组,您可以使用该组返回Map
在我的情况下,我做了这样的实验。。。

println( lb.groupBy(_.toString.replaceAll("file:/Users/xxxxxx/path/country=", "")substring(0, 3) ))

我得到的结果是

Map(abc -> ListBuffer(file:/Users/xxxxxx/path/country=abc, file:/Users/xxxxxx/path/country=abc/state=4), xyz -> ListBuffer(file:/Users/xxxxxx/path/country=xyz, file:/Users/xxxxxx/path/country=xyz/state=1, file:/Users/xxxxxx/path/country=xyz/state=6, file:/Users/xxxxxx/path/country=xyz/state=3, file:/Users/xxxxxx/path/country=xyz/state=4, file:/Users/xxxxxx/path/country=xyz/state=5, file:/Users/xxxxxx/path/country=xyz/state=2))

也许你可以用这个想法来进一步完善你想要的结果。

相关问题