spark流找到文件,但声称找不到该文件

prdp8dxp  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(483)

我有下面的-它监视一个目录并每x秒拉入一次日志。
我的问题是:
我让脚本运行
然后在目录中创建一个文件(比如testfile.txt)
然后脚本会错误地说textfile.txt不存在
它找到了文件和文件名,所以它确实存在并找到了它。
我看到的是我用一个 file:/// 它返回一个找不到的错误 file:/ . 所以它似乎少了两个//因为某些原因:
谢谢你的帮助!!!!
代码


# only files after stream starts

df = spark_session\
    .readStream\
    .option('newFilesOnly', 'true')\
    .option('header', 'true')\
    .schema(myschema)\
    .text('file:///home/keenek1/analytics/logs/')\
    .withColumn("FileName", input_file_name())

错误

FileNotFoundException: File file:/home/keenek1/analytics/logs/loggywoggywoo.txt does not exist\
kr98yfug

kr98yfug1#

请将文件:///更改为hdfs://.

df = spark_session\
    .readStream\
    .option('newFilesOnly', 'true')\
    .option('header', 'true')\
    .schema(myschema)\
    .text('hdfs://home/keenek1/analytics/logs/')\ # changed file:/// to hdfs://
    .withColumn("FileName", input_file_name())

对于下面的问题,如果相同的日志文件被覆盖,比如每小时,检查点不会重新处理该文件。我需要它说'如果修改的时间改变,重新处理'-这是可能的吗?
解决方法是,将spark流指向不同的目录&使用spark监听器从实际目录检查文件时间戳如果文件时间戳有任何更改,请使用新名称将该文件移动到流目录
让我知道如果你想要代码,我可以给你在scala,可能你需要转换成python。

相关问题