防止spark在流/流连接中存储状态

ebdffaop  于 2021-07-09  发布在  Spark
关注(0)|答案(3)|浏览(645)

我有两个流数据集,我们称之为 fastStream 以及 slowStream .
这个 fastStream 是我通过结构化流api从kafka消费的流数据集。我预计每秒可能会收到数千条消息。
这个 slowStream 实际上是一个引用(或查找)表,它正被另一个流“upserted”,并且包含我要连接到中每个消息的数据 fastStream 在我将记录保存到表之前。这个 slowStream 只有当有人更改元数据时才会更新,这种情况随时都可能发生,但我们希望每隔几天更改一次。
中的每条记录 fastStream 将在 slowStream 我本质上是想让连接立即发生,无论数据在 slowStream table。我不想等着看,如果新数据到达数据库,是否会发生潜在的匹配 slowStream .
我的问题是根据spark文档:
因此,对于这两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入匹配,并相应地生成连接的结果。
我试过在网页上添加水印 fastStream 但我认为它没有任何效果,因为文档表明需要在连接中引用带水印的列
理想情况下,我会写下:


# Apply a watermark to the fast stream

fastStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/fastStream") \
.withWatermark("timestamp", "1 hour") \
.alias("fastStream")

# The slowStream cannot be watermarked since it is only slowly changing

slowStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/slowStream") \
.alias("slowStream")

# Prevent the join from buffering the fast stream by 'telling' spark that there will never be new matches.

fastStream.join( 
  slowStrean,
  expr(""" 
    fastStream.slow_id = slowStream.id
    AND fastStream.timestamp > watermark
    """
  ),
  "inner"
).select("fastStream.*", "slowStream.metadata")

但我不认为你可以参考 watermark 在sql表达式中。
基本上,虽然我很高兴 slowStream 缓冲(所以整个表都在内存中)我不能 fastStream 缓冲,因为此表将很快消耗所有内存。相反,我只想从 fastStream 而不是保留它们,看它们将来是否匹配。
非常感谢您的帮助。

dgtucam1

dgtucam11#

如果您对引用“带水印的时间”(即1小时)感兴趣,您可以替换 watermark 在表达式中 current_timestamp - interval '1' hour .
由于您试图连接两个流,spark将坚持两者都使用水印

参考

spark流到流连接

fquxozlt

fquxozlt2#

对于内部流连接,水印和事件时间约束(连接条件)是可选的。
如果一个无界状态在卷方面不是您的问题,您可以选择不指定它们。在这种情况下,所有数据都将被缓冲,来自faststream的数据将立即与来自slowstream的所有数据连接起来。
只有当两个参数都被指定时,您的状态才会被清除。请注意这两个参数的用途:
事件时间约束(时间范围连接条件):两个事件在各自的源上生成之间的最大时间范围是多少?
水印:事件在源和处理引擎之间传输时可以延迟的最长持续时间是多少?
要定义这两个参数,首先需要回答上述问题(引自o`reilly出版的“learning apache spark,2nd edition”一书)。
关于您的代码注解:
“通过‘告诉’spark永远不会有新的匹配项,防止联接缓冲快速流。”
记住,流连接中的缓冲是必要的。否则,您只能加入当前微批处理中可用的数据。由于slowstream没有定期更新,但是faststream更新数据的速度非常快,如果不缓冲数据,您可能永远不会得到任何连接匹配。
总的来说,对于您描述的用例(“将快速变化的数据与缓慢变化的元数据连接起来”),通常最好使用流静态连接方法,其中缓慢变化的数据成为静态部分。
在流静态联接中,流数据中的每一行都将与完整的静态数据联接,而静态表则加载在每个微批中。如果加载静态表会降低性能,您可以考虑缓存它并定期更新它,如stream static join:how to refresh(unpersist/persist)static dataframe periodically中所述。

eoxn13cs

eoxn13cs3#

用我最终得到的回答我自己的问题。这当然不理想,但对于我所有的搜索来说,spark structured streaming中似乎没有控件来处理这个用例。
所以我的解决方案是读取数据集并在 foreachBatch . 这样我可以防止spark存储大量不必要的状态,并立即执行连接。缺点是,似乎没有办法增量读取流表,所以每次我都会重新读取整个表。。。

def join_slow_stream(df, batchID):

  # Read as a table rather than a stream
  slowdf = spark.read \
    .format("delta") \
    .load("dbfs:/mnt/some_file/slowStream") \
    .alias("slowStream")

  out_df = df.join(
    slowdf,
    expr(""" 
      fastStream.slow_id = slowStream.id
      """
    ),
    "inner"
  ).select("fastStream.*", "slowStream.metadata")

  # write data to database
  db_con.write(out_df)

fastStream.writeStream.foreachBatch(join_slow_stream)

相关问题