流使用风暴喷口元组触发执行进一步的计算在某些时候

rn0zuynd  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(256)

我想连接一些风暴螺栓到触发喷口,它发射元组,比如说每3-4小时或在一天中的某些时间。当连接的bolt从这个triggerpoute接收到一个元组时,应该开始对聚合数据(来自另一个kafka口)进行进一步的计算(在这种情况下,特征提取用于馈送后续mlbolt)
现在,我能用nexttuple()方法让一个spout暴露这个行为吗?这是要走的路还是有人能提出更好的方法?这是一个集群的全球时钟。
问候,谢谢

sbdsn5lh

sbdsn5lh1#

不用触发器,你可以在你的螺栓上加上记号。将以下内容添加到bolt实现中:

import backtype.storm.Constants;

@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 15);
    return conf;
}

public static boolean isTickTuple(Tuple tuple) {
    String sourceComponent = tuple.getSourceComponent();
    String sourceStreamId = tuple.getSourceStreamId();
    return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID)
            && sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
}

在execute()方法中,首先检查输入是否是tick元组,然后继续执行逻辑。

相关问题