// in your spout/bolt
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
int tickFrequencyInSeconds = 10;
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
return conf;
}
// in your spout/bolt
@Override
public void execute(Tuple tuple) {
if (isTickTuple(tuple)) {
// now you can trigger e.g. a periodic activity
}
else {
// do something with the normal tuple
}
}
private static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
2条答案
按热度按时间omjgkv6w1#
添加一个新的并行度为1的喷口,让它发出一个空信号,然后utils.sleep直到下一次(全部在nexttuple中完成)。然后,使用所有分组将所有相关的螺栓链接到该喷口,这样它们的所有示例都将收到相同的信号。
inn6fuwd2#
免责声明:我写了一篇由gakhov在上面的回答中引用的关于风暴的文章。
我认为最佳实践是在Storm0.8+中使用所谓的tick元组。通过这些,您可以配置自己的喷口/螺栓,以便在特定的时间间隔(例如,每10秒或每分钟)收到通知。
下面是一个简单的示例,它将相关组件配置为每10秒接收一次tick元组:
然后可以在喷口/螺栓中使用条件开关
execute()
方法来区分“正常”传入元组和特殊tick元组。例如:几天前,我又写了一篇非常详细的博客文章,讲述了如何在Storm中做到这一点,正如gakhov所指出的(无耻的插头!)。