(twitter)storm的聚合窗口

r1zk6ea1  于 2021-06-26  发布在  Storm
关注(0)|答案(2)|浏览(324)

我在玩storm,我想知道storm在哪里指定(如果可能的话)聚合时的(滚动/滑动)窗口大小。e、 如果我们想在twitter上找到前一个小时的热门主题。我们如何指定螺栓每小时返回一次结果?这是在每个螺栓内编程完成的吗?或者是用某种方式来指定一个“窗口”?

omjgkv6w

omjgkv6w1#

添加一个新的并行度为1的喷口,让它发出一个空信号,然后utils.sleep直到下一次(全部在nexttuple中完成)。然后,使用所有分组将所有相关的螺栓链接到该喷口,这样它们的所有示例都将收到相同的信号。

inn6fuwd

inn6fuwd2#

免责声明:我写了一篇由gakhov在上面的回答中引用的关于风暴的文章。
我认为最佳实践是在Storm0.8+中使用所谓的tick元组。通过这些,您可以配置自己的喷口/螺栓,以便在特定的时间间隔(例如,每10秒或每分钟)收到通知。
下面是一个简单的示例,它将相关组件配置为每10秒接收一次tick元组:

// in your spout/bolt
@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    int tickFrequencyInSeconds = 10;
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
    return conf;
}

然后可以在喷口/螺栓中使用条件开关 execute() 方法来区分“正常”传入元组和特殊tick元组。例如:

// in your spout/bolt
@Override
public void execute(Tuple tuple) {
    if (isTickTuple(tuple)) {
        // now you can trigger e.g. a periodic activity
    }
    else {
        // do something with the normal tuple
    }
}

private static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

几天前,我又写了一篇非常详细的博客文章,讲述了如何在Storm中做到这一点,正如gakhov所指出的(无耻的插头!)。

相关问题