如何在spark streaming中加载目录中的一些文件并监视该目录中的新文件而不丢失?

5f0d552i  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(125)

我有一个hdfs目录,其中包含许多文件:

/user/root/1.txt
/user/root/2.txt
/user/root/3.txt
/user/root/4.txt

并且有一个守护进程,它每分钟向此目录添加一个文件。(例如,5. txt,6. txt,7. txt..)
我想启动一个Spark流作业,加载3.txt,4.txt,然后检测4.txt后的所有新文件。
请注意,由于这些文件很大,处理这些文件将花费很长时间。因此,如果我在启动流任务之前处理3.txt和4.txt,可能在处理3.txt和4.txt的过程中会将5.txt,6.txt产生到这个dir中。而当流任务启动时,5.txt和6.txt将无法处理,因为它将仅从新文件(从7.txt)处理
我不确定我是否清楚地描述了这个问题,如果你有任何问题,请问我

ars1skjm

ars1skjm1#

我找到了一个解决办法:
根据文档API:https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.streaming.StreamingContext

def
fileStream[K, V, F <: InputFormat[K, V]](directory: String, filter: (Path) ⇒ Boolean, newFilesOnly: Boolean)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]

创建一个输入流,用于监视Hadoop兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。
我们可以设置filter函数来过滤文件< 4.txt
然后将“newFilesOnly”设置为false

相关问题