Kafka会话窗口的聚合行为

fiei3ece  于 2023-03-01  发布在  Apache
关注(0)|答案(1)|浏览(161)

我尝试使用类MyVeryCustomAggregator在Kafka的SessionWindows上计算聚合,该类将聚合信息存储为属性,并提供方法aggregateresult,分别用于处理新消息和检索最终结果。
我的主要困难在于实现aggregate方法所需的Merger接口,即MyVeryCustomAggregator::mergeWith,我所计算的聚合高度依赖于消息的顺序和时间戳,因此我无法像Confluent文档中那样简单地添加两个聚合器,该文档仅指出:
当窗口基于会话时,您必须另外提供一个“会话合并器”聚合器(例如,mergedAggValue = leftAggValue + rightAggValue)。
使用会话窗口时:无论何时合并两个会话,都调用会话合并。

如能就下列问题提供进一步资料,将不胜感激

  • 为什么聚合需要被合并呢?因为我没有使用宽限期,所以我希望每次会话关闭时只计算一次聚合。
  • 即使必须进行无序处理,我也希望加载以前的聚合器并用于进一步处理。
  • 什么参数被传递给合并器(agg1agg2)?这些是例如两个连续计算的聚合还是一系列聚合可以由Kafka以任意顺序合并?

示例实现可能更容易在答案中引用。

builder.stream("INPUT_TOPIC", Consumed.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
    .groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(15)))
    .aggregate(
        MyVeryCustomAggregator::new,
        (key, newValue, agg) -> agg.aggregate(newValue),
        (aggKey, agg1, agg2) -> agg1.mergeWith(agg2),
        Named.as("MyVeryCustomAggregator"),
        Materialized.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomAggregator.class))
    )
    .toStream()
    .map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg.result()))
    .to("OUTPUT_TOPIC", Produced.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomOutput.class)));
x8diyxa7

x8diyxa71#

每个传入记录都会产生一个新会话(一个SessionWindow),其开始和结束时间基于该记录的时间戳。然后Kafka Streams将通过当前记录时间戳的时间戳范围- window.inactivityGap到当前记录时间戳+window. inactivityGap来搜索给定键的所有会话窗口。实际上,则可能找到一个会话窗口-它们被合并,因为只要记录到达不活动间隙内,会话的大小就将继续增长。
由于以前的会话是按时间获取的,因此可以保证按照到达的顺序应用合并,并且按照时间顺序计算合并的聚合,这不是任意的。
两个参数agg1agg2是当前聚集和要合并/聚集成会话窗口的总聚集的下一聚集,返回到前一句,因为聚集/合并是按时间顺序应用的,所以它们实际上是连续的而不是任意的。
高温加热

相关问题