python Spark结构化流水印问题

x8diyxa7  于 2023-03-07  发布在  Python
关注(0)|答案(1)|浏览(163)
    • bounty已结束**。此问题的答案可获得+50的信誉奖励。奖励宽限期将在20小时后结束。Gatto Nou正在寻找来自信誉良好来源的答案

我正在尝试用下面的代码做一个简单的流。我的输入是一本随机的书,我计算其中唯一单词的数量。在这之后,我想把输出保存到一个文件中。

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col, regexp_extract, lower

spark = SparkSession.builder.appName("Streaming").getOrCreate()

book = spark.readStream.option("maxFilesPerTrigger", 1).text("/FileStore/tables/another_books/")

output = (book
          .select(split(col("value"), " ").alias("line"))
          .select(explode(col("line")).alias("word"))
          .select(lower(col("word")).alias("word"))
          .select(regexp_extract(col("word"), "[a-z']*", 0).alias("word"))
          .where(col("word") != "")
          .groupby(col("word")).count()
         )

streamingQuery = (output
                  .writeStream
                  .outputMode("append")
                  .format("json")
                  .option("path", "/FileStore/tables/outputStream")
                  .option("checkpointLocation", "/FileStore/tables/checkpointLocation")
                  .start()
                  .awaitTermination()
                 )

由于此错误,此代码不起作用:AnalysisException:*当不带水印的流式 Dataframe /数据集上存在流式聚合时,不支持追加输出模式;
我尝试在
groupby操作之前添加withWatermark
*('time ',' 10 minutes'),但是代码仍然不起作用...
谢谢您的建议!

6g8kf2rb

6g8kf2rb1#

Spark不允许你使用"append"输出模式,当你有没有水印的流聚合时,这是因为Spark需要能够定义一个时间点,它可以从这个时间点开始聚合结果。你提到你尝试添加一个水印,但是它不起作用--没有进一步的细节或者错误消息,很难说为什么。但是,通过指定列和阈值时间,确保正确使用水印,并且在流中的任何聚合之前使用水印。
例如:

from pyspark.sql.functions import window

output = (book
          .select(split(col("value"), " ").alias("line"))
          .select(explode(col("line")).alias("word"))
          .select(lower(col("word")).alias("word"))
          .select(regexp_extract(col("word"), "[a-z']*", 0).alias("word"))
          .where(col("word") != "")
          .withWatermark("timestamp", "10 minutes")
          .groupBy(window(col("timestamp"), "10 minutes"), col("word"))
          .count()
         )

streamingQuery = (output
                  .writeStream
                  .outputMode("append")
                  .format("json")
                  .option("path", "/FileStore/tables/outputStream")
                  .option("checkpointLocation", "/FileStore/tables/checkpointLocation")
                  .start()
                  .awaitTermination()
                 )

相关问题