我尝试使用类MyVeryCustomAggregator
在Kafka的SessionWindows上计算聚合,该类将聚合信息存储为属性,并提供方法aggregate
和result
,分别用于处理新消息和检索最终结果。
我的主要困难在于实现aggregate
方法所需的Merger
接口,即MyVeryCustomAggregator::mergeWith
,我所计算的聚合高度依赖于消息的顺序和时间戳,因此我无法像Confluent文档中那样简单地添加两个聚合器,该文档仅指出:
当窗口基于会话时,您必须另外提供一个“会话合并器”聚合器(例如,mergedAggValue = leftAggValue + rightAggValue)。
使用会话窗口时:无论何时合并两个会话,都调用会话合并。
如能就下列问题提供进一步资料,将不胜感激
- 为什么聚合需要被合并呢?因为我没有使用宽限期,所以我希望每次会话关闭时只计算一次聚合。
- 即使必须进行无序处理,我也希望加载以前的聚合器并用于进一步处理。
- 什么参数被传递给合并器(
agg1
和agg2
)?这些是例如两个连续计算的聚合还是一系列聚合可以由Kafka以任意顺序合并?
示例实现可能更容易在答案中引用。
builder.stream("INPUT_TOPIC", Consumed.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
.groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomMessage.class)))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(15)))
.aggregate(
MyVeryCustomAggregator::new,
(key, newValue, agg) -> agg.aggregate(newValue),
(aggKey, agg1, agg2) -> agg1.mergeWith(agg2),
Named.as("MyVeryCustomAggregator"),
Materialized.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomAggregator.class))
)
.toStream()
.map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg.result()))
.to("OUTPUT_TOPIC", Produced.with(Serdes.String(), CustomSerdes.Json(MyVeryCustomOutput.class)));
1条答案
按热度按时间x8diyxa71#
每个传入记录都会产生一个新会话(一个
SessionWindow
),其开始和结束时间基于该记录的时间戳。然后Kafka Streams将通过当前记录时间戳的时间戳范围- window.inactivityGap到当前记录时间戳+window. inactivityGap来搜索给定键的所有会话窗口。实际上,则可能找到一个会话窗口-它们被合并,因为只要记录到达不活动间隙内,会话的大小就将继续增长。由于以前的会话是按时间获取的,因此可以保证按照到达的顺序应用合并,并且按照时间顺序计算合并的聚合,这不是任意的。
两个参数
agg1
和agg2
是当前聚集和要合并/聚集成会话窗口的总聚集的下一聚集,返回到前一句,因为聚集/合并是按时间顺序应用的,所以它们实际上是连续的而不是任意的。高温加热