我有一个hdfs目录,其中包含许多文件:
/user/root/1.txt
/user/root/2.txt
/user/root/3.txt
/user/root/4.txt
并且有一个守护进程,它每分钟向此目录添加一个文件。(例如,5. txt,6. txt,7. txt..)
我想启动一个Spark流作业,加载3.txt,4.txt,然后检测4.txt后的所有新文件。
请注意,由于这些文件很大,处理这些文件将花费很长时间。因此,如果我在启动流任务之前处理3.txt和4.txt,可能在处理3.txt和4.txt的过程中会将5.txt,6.txt产生到这个dir中。而当流任务启动时,5.txt和6.txt将无法处理,因为它将仅从新文件(从7.txt)处理
我不确定我是否清楚地描述了这个问题,如果你有任何问题,请问我
1条答案
按热度按时间ars1skjm1#
我找到了一个解决办法:
根据文档API:https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.streaming.StreamingContext
创建一个输入流,用于监视Hadoop兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。
我们可以设置filter函数来过滤文件< 4.txt
然后将“newFilesOnly”设置为false