我正在尝试用下面的代码做一个简单的流。我的输入是一本随机的书,我计算其中唯一单词的数量。在这之后,我想把输出保存到一个文件中。
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col, regexp_extract, lower
spark = SparkSession.builder.appName("Streaming").getOrCreate()
book = spark.readStream.option("maxFilesPerTrigger", 1).text("/FileStore/tables/another_books/")
output = (book
.select(split(col("value"), " ").alias("line"))
.select(explode(col("line")).alias("word"))
.select(lower(col("word")).alias("word"))
.select(regexp_extract(col("word"), "[a-z']*", 0).alias("word"))
.where(col("word") != "")
.groupby(col("word")).count()
)
streamingQuery = (output
.writeStream
.outputMode("append")
.format("json")
.option("path", "/FileStore/tables/outputStream")
.option("checkpointLocation", "/FileStore/tables/checkpointLocation")
.start()
.awaitTermination()
)
由于此错误,此代码不起作用:AnalysisException:*当不带水印的流式 Dataframe /数据集上存在流式聚合时,不支持追加输出模式;
我尝试在groupby操作之前添加withWatermark*('time ',' 10 minutes'),但是代码仍然不起作用...
谢谢您的建议!
1条答案
按热度按时间6g8kf2rb1#
Spark不允许你使用"append"输出模式,当你有没有水印的流聚合时,这是因为Spark需要能够定义一个时间点,它可以从这个时间点开始聚合结果。你提到你尝试添加一个水印,但是它不起作用--没有进一步的细节或者错误消息,很难说为什么。但是,通过指定列和阈值时间,确保正确使用水印,并且在流中的任何聚合之前使用水印。
例如: