我是一个有点新手Kafka流工作,但我注意到的是一个行为,我不期待。我已经开发了一个应用程序,是从6个主题消费。我的目标是通过一个内部字段对每个主题的事件进行分组(或加入)。很好用。但我的问题是窗口时间,它看起来像是每个周期的结束时间影响到所有的聚合所占用的时间。对于同时进行的所有聚合,是否只有一个计时器?。我希望当流得到配置的30秒时,就可以退出聚合过程。我认为这是可能的,因为我看到了windowedregion变量和windowedregion.window().start()和windowedregion.window().end()的数据,每个流的值都不同。这是我的密码:
streamsBuilder
.stream(topicList, Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(new MyGroupByKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(windowInactivity).until(windowDuration))
.aggregate(
new MyInitializer(),
new MyAggregator(),
new MyMerger(),
Materialized.with(new Serdes.StringSerde(), new PaymentListSerde())
)
.mapValues(
new MyMapper()
)
.toStream(new MyKeyValueMapper())
.to(consolidationTopic,Produced.with(Serdes.String(), Serdes.String()));
1条答案
按热度按时间lb3vh1jj1#
我不确定这是否是您要问的问题,但每个聚合(每个键会话窗口)可能确实会更新多次。通常情况下,在“整合”主题的会话窗口中,每个窗口不会只收到一条消息和最终结果。这里更详细地解释了这一点:https://stackoverflow.com/a/38945277/7897191