spark:如何在过去10天内从s3获取最新文件

jobtbby3  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(358)

我正在尝试从s3获取最近10天内的最新文件,但输入中不存在任何文件。问题是路径包含日期。
我的道路是这样的:

val path = "s3://bucket-info/folder1/folder2"

val date = "2019/04/12"    ## YYYY/MM/DD

我这样做=

val update_path = path+"/" +date //this will become s3://bucket-info/folder1/folder2/2019/04/12 

def fileExist(path: String, sc: SparkContext): Boolean = FileSystem.get(getS3OrFileUri(path),
  sc.hadoopConfiguration).exists(new Path(path + "/_SUCCESS"))

if (fileExist(update_path, sc)) {
    //read and process the file

} else {
       log("File not exist")
       // I need to get the latest file in the last five days and use. So that I can check "s3://bucket-info/folder1/folder2/2019/04/11" , s3://bucket-info/folder1/folder2/2019/04/10 and others. If no latest file in last 5 days. throw error. s

}

但我的问题是,到了月底怎么检查?我可以做它在for循环,但有没有任何优化和优雅的方式来做到这一点在Spark?

cnwbcb6i

cnwbcb6i1#

不是很理想,但如果你想利用Spark,Dataframe读取器可以采取多种途径和方法 input_file_name 为您提供路径:

val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12"
val fmt = DateTimeFormatter.ofPattern("yyyy/MM/dd")
val end = LocalDate.parse(date, fmt)
val prefixes = (0 until 10).map(end.minusDays(_)).map(d => s"$path/${fmt.format(d)}")

val prefix = spark.read
  .textFile(prefixes:_*)
  .select(input_file_name() as "file")
  .distinct()
  .orderBy(desc("file"))
  .limit(1)
  .collect().collectFirst {
  case Row(prefix: String) => prefix
}

prefix.fold {
  // log error
}
{ path =>
  //read and process the file
}

这是非常低效的,而且没有明确的方法可以避免使用spark作为s3hadoop文件系统实现使用递归结构时效率不高的问题。如果您愿意直接使用s3api,可以设置 s"$path/${fmt.format(end.minusDays(10))}" 作为start after参数,并使用类似于这样的内容列出键。这是因为s3总是返回按字母顺序排序的键列表,并且日期键中没有填充。

相关问题