我有两个流数据集,我们称之为 fastStream
以及 slowStream
.
这个 fastStream
是我通过结构化流api从kafka消费的流数据集。我预计每秒可能会收到数千条消息。
这个 slowStream
实际上是一个引用(或查找)表,它正被另一个流“upserted”,并且包含我要连接到中每个消息的数据 fastStream
在我将记录保存到表之前。这个 slowStream
只有当有人更改元数据时才会更新,这种情况随时都可能发生,但我们希望每隔几天更改一次。
中的每条记录 fastStream
将在 slowStream
我本质上是想让连接立即发生,无论数据在 slowStream
table。我不想等着看,如果新数据到达数据库,是否会发生潜在的匹配 slowStream
.
我的问题是根据spark文档:
因此,对于这两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入匹配,并相应地生成连接的结果。
我试过在网页上添加水印 fastStream
但我认为它没有任何效果,因为文档表明需要在连接中引用带水印的列
理想情况下,我会写下:
# Apply a watermark to the fast stream
fastStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/fastStream") \
.withWatermark("timestamp", "1 hour") \
.alias("fastStream")
# The slowStream cannot be watermarked since it is only slowly changing
slowStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/slowStream") \
.alias("slowStream")
# Prevent the join from buffering the fast stream by 'telling' spark that there will never be new matches.
fastStream.join(
slowStrean,
expr("""
fastStream.slow_id = slowStream.id
AND fastStream.timestamp > watermark
"""
),
"inner"
).select("fastStream.*", "slowStream.metadata")
但我不认为你可以参考 watermark
在sql表达式中。
基本上,虽然我很高兴 slowStream
缓冲(所以整个表都在内存中)我不能 fastStream
缓冲,因为此表将很快消耗所有内存。相反,我只想从 fastStream
而不是保留它们,看它们将来是否匹配。
非常感谢您的帮助。
3条答案
按热度按时间dgtucam11#
如果您对引用“带水印的时间”(即1小时)感兴趣,您可以替换
watermark
在表达式中current_timestamp - interval '1' hour
.由于您试图连接两个流,spark将坚持两者都使用水印
参考
spark流到流连接
fquxozlt2#
对于内部流连接,水印和事件时间约束(连接条件)是可选的。
如果一个无界状态在卷方面不是您的问题,您可以选择不指定它们。在这种情况下,所有数据都将被缓冲,来自faststream的数据将立即与来自slowstream的所有数据连接起来。
只有当两个参数都被指定时,您的状态才会被清除。请注意这两个参数的用途:
事件时间约束(时间范围连接条件):两个事件在各自的源上生成之间的最大时间范围是多少?
水印:事件在源和处理引擎之间传输时可以延迟的最长持续时间是多少?
要定义这两个参数,首先需要回答上述问题(引自o`reilly出版的“learning apache spark,2nd edition”一书)。
关于您的代码注解:
“通过‘告诉’spark永远不会有新的匹配项,防止联接缓冲快速流。”
记住,流连接中的缓冲是必要的。否则,您只能加入当前微批处理中可用的数据。由于slowstream没有定期更新,但是faststream更新数据的速度非常快,如果不缓冲数据,您可能永远不会得到任何连接匹配。
总的来说,对于您描述的用例(“将快速变化的数据与缓慢变化的元数据连接起来”),通常最好使用流静态连接方法,其中缓慢变化的数据成为静态部分。
在流静态联接中,流数据中的每一行都将与完整的静态数据联接,而静态表则加载在每个微批中。如果加载静态表会降低性能,您可以考虑缓存它并定期更新它,如stream static join:how to refresh(unpersist/persist)static dataframe periodically中所述。
eoxn13cs3#
用我最终得到的回答我自己的问题。这当然不理想,但对于我所有的搜索来说,spark structured streaming中似乎没有控件来处理这个用例。
所以我的解决方案是读取数据集并在
foreachBatch
. 这样我可以防止spark存储大量不必要的状态,并立即执行连接。缺点是,似乎没有办法增量读取流表,所以每次我都会重新读取整个表。。。