hadoop 替代InMemoryFileIndex使用spark scala列出文件夹中的文件

lstz6jyr  于 2023-11-16  发布在  Hadoop
关注(0)|答案(2)|浏览(307)

我想解决的任务:

我在Azure存储中的特定文件夹中不断涌入文件。我想定期列出此文件夹中的文件,以便将它们复制到不同的位置。我正在跟踪我已经复制的文件,因此我正在从原始文件夹中的文件列表中删除已经复制的文件。

这个问题:

使用InMemoryFileIndex列出文件路径非常慢。我在代码中使用“y”,这是一组包含我想要的文件路径的字符串,以确定我有什么,然后稍后从中提取已经复制的文件路径:

val x = InMemoryFileIndex.bulkListLeafFiles(... my parameters ...)
val y = x.flatMap(_._2.map(.path))

字符串

问题:是否有更聪明的方法来列出我的所有文件?

例如,我是否可以创建一个表,它代表文件夹中的所有文件,同时也考虑到传入的文件,这样我就不必一次又一次地调用InMemoryFileIndex来遍历所有文件?

mf98qq94

mf98qq941#

没有更快的方法,adls对于文件列表来说非常慢(尽管它在过去几年中有所改进),Hadoop列表也使用线程,这些线程会快速吸收帐户上的可用连接。因此,值得使用Delta而不是parquet(无论运行时是否使用Databricks)。
查看using events,并在它们到达时做出React,而不是通过启动新作业来计时,或者运行一个作业来查看到达事件并将其存储在日志中以便在以后的批处理中进行处理。如果您使用的是Databricks,请查看autoloader以了解另一种可能的方法。
我建议,如果文件到达/发送也在您的控制之下,您也可以让发送者在处理时发送一个事件。

cld4siwp

cld4siwp2#

我现在正在尝试的解决方案是:

val streamingQuery = spark.readStream
.format("binaryFile")
.schema("`path` STRING, `modificationTime` TIMESTAMP, `length` BIGINT, `content` BINARY")
.option("recursiveFileLookup", "true")
.load("my path here")
.filter(col("modificationTime") > "2023-10-30 07:00:00")
.writeStream
.trigger(Trigger.Once)
.foreachBatch (my code goes here on how I copy files)
.option("checkpointLocation", "my path here")
.start()
.awaitTermination()

字符串
现在我已经在一个有几个文件的文件夹上测试了这个,它正在工作。我的下一个问题是:
检查点是否仅注册col(“modificationTime”)>“2023-10-30 07:00:00”的数据,或它在文件夹中读取的所有文件?

相关问题