我们正在运行一个Spark流作业,它从一个目录中检索文件(使用textFileStream)。我们担心的一个问题是,作业已经关闭,但文件仍然被添加到目录中。一旦作业再次启动,这些文件不会被拾取(因为它们不是新的或在作业运行时更改的),但我们希望它们被处理。
1)有没有解决方案?有没有办法跟踪哪些文件已经处理,我们可以“强制”旧文件被拾起?
2)有没有办法删除已处理的文件?
我们正在运行一个Spark流作业,它从一个目录中检索文件(使用textFileStream)。我们担心的一个问题是,作业已经关闭,但文件仍然被添加到目录中。一旦作业再次启动,这些文件不会被拾取(因为它们不是新的或在作业运行时更改的),但我们希望它们被处理。
1)有没有解决方案?有没有办法跟踪哪些文件已经处理,我们可以“强制”旧文件被拾起?
2)有没有办法删除已处理的文件?
3条答案
按热度按时间fgw7neuy1#
下面的文章几乎涵盖了你所有的问题。
https://blog.yanchen.ca/2016/06/28/fileinputdstream-in-spark-streaming/
流读取器在启动作业/应用程序时使用系统时钟启动批处理窗口。显然,之前创建的所有文件都将被忽略。请尝试启用检查点设置。
删除文件可能是不必要的。如果检查点工作,Spark会识别未处理的文件。如果由于某种原因要删除文件,请实现自定义输入格式和读取器(请参考文章)来捕获文件名并适当使用此信息。但我不推荐这种方法。
v6ylcynt2#
你第二个问题的答案,
现在在Spark 3中已经可以了。你可以使用“cleanSource”选项来读取流。
感谢文档https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html和此视频https://www.youtube.com/watch?v=EM7T34Uu2Gg。
经过几个小时的寻找,终于找到了解决办法
gmxoilav3#
根据我的经验,我不能使用检查点功能,所以我不得不删除/移动已经进入每个批次的处理过的文件。
获取这些文件的方法有点棘手,但基本上我们可以说它们是当前
RDD
的祖先(依赖项)。然后我使用的是一种递归方法,该方法爬取该结构并恢复开始以hdfs
开头的RDD
的名称。字符串
因此,在
forEachRDD
方法中,您可以轻松地调用它:型