风暴流喷口处理中的下一步方法问题

pkln4tw6  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(328)

我正在storm上开发一些数据分析算法,对storm的内部设计有一些疑问。我想模拟一个传感器数据在storm中的产生和处理,因此我使用了spout,通过在nexttuple method of spout中设置一个休眠方法,以固定的时间间隔将传感器数据推送到后续的螺栓中。但从实验结果来看,喷口并没有按规定的速率推送数据。在实验中,系统没有出现瓶颈螺栓。
然后我查了一些关于ack和nexttuple方法的资料。现在我的疑问是,是否只有当前面的元组在ack方法中被完全处理和确认时,nexttuple方法才被调用?
如果这是真的,是否意味着我不能设置一个固定的时间间隔来发送数据?
太多了!

j8ag8udp

j8ag8udp1#

我的经验是,你不应该指望风暴作出任何实时保证,包括在你的情况下元组处理率。您当然可以编写一个只在某个时间日程表上发出元组的喷口,但storm不能保证它总是按您所希望的频率调用喷口。
请注意,只要拓扑中有空间容纳更多挂起的元组,就应该调用nexttuple。如果拓扑有空闲容量,我希望storm尽可能用它能得到的任何东西来填充它。

0sgqnhkj

0sgqnhkj2#

我有一个类似的用例,我实现它的方法是使用 TICK_TUPLE ```
Config tickConfig = new Config();
tickConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 15);
...
...
builder.setBolt("storage_bolt", new S3Bolt(), 4).fieldsGrouping("shuffle_bolt", new Fields("hash")).addConfigurations(tickConfig);

然后在我的 `storage_bolt` (注意它是用 `python` ,但你会得到一个想法)我检查消息是否 `tick_tuple` 如果是,则执行我的代码:

def process(self, tup):
if tup.stream == '__tick':
# Your logic that need to be executed every 15 seconds,
# or what ever you specified in tickConfig.
# NOTE: the maximum time is 600 s.
storm.ack(tup)
return

相关问题