我有下面的-它监视一个目录并每x秒拉入一次日志。
我的问题是:
我让脚本运行
然后在目录中创建一个文件(比如testfile.txt)
然后脚本会错误地说textfile.txt不存在
它找到了文件和文件名,所以它确实存在并找到了它。
我看到的是我用一个 file:///
它返回一个找不到的错误 file:/
. 所以它似乎少了两个//因为某些原因:
谢谢你的帮助!!!!
代码
# only files after stream starts
df = spark_session\
.readStream\
.option('newFilesOnly', 'true')\
.option('header', 'true')\
.schema(myschema)\
.text('file:///home/keenek1/analytics/logs/')\
.withColumn("FileName", input_file_name())
错误
FileNotFoundException: File file:/home/keenek1/analytics/logs/loggywoggywoo.txt does not exist\
1条答案
按热度按时间kr98yfug1#
请将文件:///更改为hdfs://.
对于下面的问题,如果相同的日志文件被覆盖,比如每小时,检查点不会重新处理该文件。我需要它说'如果修改的时间改变,重新处理'-这是可能的吗?
解决方法是,将spark流指向不同的目录&使用spark监听器从实际目录检查文件时间戳如果文件时间戳有任何更改,请使用新名称将该文件移动到流目录
让我知道如果你想要代码,我可以给你在scala,可能你需要转换成python。