目前,我试图寻找风暴的消息处理。我觉得滑动窗口的功能很有趣,并尝试让它工作。
但是即使我把时间间隔设置为5秒,窗口后面的计算也要频繁得多。似乎每个新消息都会执行tuple窗口的execute方法。
builder.setBolt("messageCountBolt",
new MessageCountBolt()
.withWindow(
new BaseWindowedBolt.Duration(20, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS))
.withMessageIdField("id")
.withTimestampField("timeStamp")
.withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)),
1).globalGrouping("spout");
有人知道为什么吗?我希望计算在5秒内等待所有消息。
2条答案
按热度按时间nhjlsmyf1#
你必须使用
withTumblingWindow
相反withWindow
.withWindow
对每个输入元组执行,并传递包含最后一个输入消息的输入批。但是withTumblingWindow
将在一个批中聚合所有输入消息,并在一个批中传递整个消息。falq053o2#
我认为原因是您使用的是slidingwindow—它为该窗口中的每个入口和出口生成一个输出。如果您只希望在窗口末尾有一个输出,那么最好使用批处理窗口或滚动窗口。总结如下:
滑动窗口:将每个事件保持在给定的时间窗口内,每当添加或删除新事件时都生成一个输出。
批处理窗口:也称为滚动窗口,它们只在时间窗口结束时产生输出。