我正在尝试从s3获取最近10天内的最新文件,但输入中不存在任何文件。问题是路径包含日期。
我的道路是这样的:
val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12" ## YYYY/MM/DD
我这样做=
val update_path = path+"/" +date //this will become s3://bucket-info/folder1/folder2/2019/04/12
def fileExist(path: String, sc: SparkContext): Boolean = FileSystem.get(getS3OrFileUri(path),
sc.hadoopConfiguration).exists(new Path(path + "/_SUCCESS"))
if (fileExist(update_path, sc)) {
//read and process the file
} else {
log("File not exist")
// I need to get the latest file in the last five days and use. So that I can check "s3://bucket-info/folder1/folder2/2019/04/11" , s3://bucket-info/folder1/folder2/2019/04/10 and others. If no latest file in last 5 days. throw error. s
}
但我的问题是,到了月底怎么检查?我可以做它在for循环,但有没有任何优化和优雅的方式来做到这一点在Spark?
1条答案
按热度按时间cnwbcb6i1#
不是很理想,但如果你想利用Spark,Dataframe读取器可以采取多种途径和方法
input_file_name
为您提供路径:这是非常低效的,而且没有明确的方法可以避免使用spark作为s3hadoop文件系统实现使用递归结构时效率不高的问题。如果您愿意直接使用s3api,可以设置
s"$path/${fmt.format(end.minusDays(10))}"
作为start after参数,并使用类似于这样的内容列出键。这是因为s3总是返回按字母顺序排序的键列表,并且日期键中没有填充。