我有一个文件源流,它从一个s3 bucket读取数据并将其结果写入另一个bucket,如下所示:
data_sdf = spark.readStream \
.schema(input_data.schema) \
.parquet("s3://my_input_folder")
results_sdf = process(data_sdf)
results_query = results_sdf.writeStream \
.format("parquet") \
.option("path", "s3://my_results_folder") \
.option("checkpointLocation", "s3://my_checkpoint_folder") \
.queryName("results_query") \
.start()
其中process()
是用于转换Spark DataFrame的任意函数。
然而,在“启动”results_query
流之后,这些语句:
print(spark.streams.active)
print(results_query.status)
print(results_query.lastProgress)
导致:
[]
{'message': 'Terminated with exception: No usable value for path\nDid not find value which can be converted into java.lang.String', 'isDataAvailable': False, 'isTriggerActive': False}
None
我没有说的是,我有 * 另一个 * 结构化流,它将文件写入输入文件夹s3://my_input_folder
。然而,我仍然得到上面的异常,即使在尝试启动上面的一个之前停止了这个其他流。但是如果我只是简单地将一个常规的、非流的DataFrame写入同一个输入文件夹,那么上面的流就可以工作了。
有人知道我哪里做错了吗?
有趣的是,关于结构化流的Spark教程在这里提到了文件输入源:
注2:启用此选项时,不应使用来自多个源或查询的源路径。同样,您必须确保源路径与文件流接收器的输出目录中的任何文件都不匹配。
所以也许我不应该试图从同一个文件夹中读取文件,而我从另一个流中写入文件;并且预期相当神秘的错误No usable value for path
。我猜文件流接收器不会在目标目录中原子地写入文件。
1条答案
按热度按时间hkmswyz61#
您可以通过为目标文件/表创建新的检查点位置来解决这个问题,而不是重新使用早期流中的检查点,因为这里我们将流数据作为源处理。