我阅读数据从Kafka(startingOffsets作为最早)和写入控制台上的数据进行几次测试。已给出水印持续时间为10秒。以下Spark文档-https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication
result_df = parsed_df\
.withWatermark("event_time", "10 seconds")\
.dropDuplicates(["uuid"])
result_df\
.writeStream\
.option("checkpointLocation", "gs://test/checkpoint_ts/")\
.format("console")\
.start()
字符串
**将以下记录逐个传递到Kafka,并从spark结构化流中阅读每条记录:**忽略以下#
{“uuid”:10150,“event_time”:“2023-08- 07 T08:00:00.004071876Z”} #通过,显示在控制台上
{“uuid”:10151,“event_time”:“2023-08- 07 T09:00:00.004071876Z”} #通过,显示在控制台上
{“uuid”:10152,“event_time”:“2023-08- 07 T10:00:00.004071876Z”} #通过,显示在控制台上
{“uuid”:10150,“event_time”:“2023-08- 07 T11:00:00.004071876Z”} #已丢弃,未显示在控制台上(这是预期的吗?鉴于水印阈值现在为2023-08- 07 T10:00:00 - 10秒,因此该记录不应通过?)
{“uuid”:10153,“event_time”:“2023-08- 07 T06:00:00.004071876Z”} #已丢弃,预期给定的event_time早于水印阈值
问题:当阅读第4条uuid:10150的记录时,水印阈值为2023-08- 07 T10:00:00 - 10 sec。由于第一条具有相同uuid:10150的记录(具有不同的event_time福尔斯)比当前水印阈值更旧,因此第4条记录是否应通过给定的spark流式传输状态存储,而不是将第1条记录保存在存储器中,以便spark基于uuid将其检测为重复?
1条答案
按热度按时间y1aodyip1#
dropDuplicates
运算符有点微妙。下面是它工作的两种“模式”:1.如果你没有传入一个事件时间列,它将无限期地保持状态。在这种情况下,它会进行适当的“全局”重复数据删除,因为它会记住它看到的所有内容。
1.如果你传入一个事件时间列,它将清除小于水印的状态。它还使用你提供的所有列执行重复数据删除。
你做了什么
在你的例子中,你只传入了一个“value”列,即非事件时间列。这是“mode”1:
字符串
因此,
dropDuplicates
将无限期地保留状态。因此,示例中的记录0、1、2和4将被允许通过,但记录3将被删除重复数据,因为记录0中存在其UUID10150
。你想做的事
真的,你想做第二个“模式”:
型
让我们考虑一下你的例子,假设你这样做了。我将假设这些记录中的每一个都在它自己的批次中被处理,这样水印就更新了每一个记录(结构化流只更新每一个批次的水印)。
1.第一条记录被处理。(10150,8小时)被添加到状态。水印更新到07:59:50。我们发出记录,因为它是唯一的。
1.第二条记录被处理。(10151,9 hours)被添加到状态,水印更新为08:59:50。我们发出记录。由于水印现在大于事件时间(10150,8 hours),其他记录处于状态,我们从状态中删除(10150,8 hours)。现在,状态中唯一的内容是(10151,9 hours)。
1.同样的事情。(10152,10 hours)被添加到状态,水印更新为09:59:50,我们发出记录。由于水印大于(10151,9 hours)的事件时间,状态中的另一条记录,我们从状态中删除(10151,9 hours)。现在,状态中唯一的东西是(10152,10 hours)。
1.现在,为了一些微妙之处:下一条记录的uuid 10150不是全局唯一的,但是由于我们由于水印而删除了state,操作员不知道它以前是否见过10150。因此,我们实际上发出了这条记录。(10150,11 hours)被添加到state,水印更新为10:59:50,并且我们从state中删除(10152,10 hours)。
1.此时,水印为10:59:50。由于最后一条记录的时间戳小于水印,因此我们将其删除,即使它是全局唯一的。在
StreamingQueryProgress
中,您将看到numRowsDroppedByWatermark
为1。为什么会掉记录
如果您遵循了这个示例,您会注意到我们删除记录有两个原因。
1.第一个原因是我们发现了一个重复的记录。如果我们收到的记录已经存在于状态中,就会发生这种情况。
1.第二个原因是由于一个 late 记录。如果我们收到一个事件时间小于水印的记录,我们会删除它(即使它是全局唯一的)。
删除记录的第二个原因实际上与去重无关。它只与水印的语义有关。水印为我们提供了一个事件时间
t
,在此之前我们可以预期不会收到更多记录。如果引擎确实收到了一个事件时间小于t
的记录,它将忽略它。在Structured Streaming中,任何内置的有状态操作符都是如此,无论是重复数据删除、聚合还是流-流连接。
结语
withWatermark
调用,那么您应该将正在使用的事件时间列的名称传递给dropDuplicates
操作符。