我实现了一个简单的storm拓扑,它有一个喷口和一个在本地集群模式下运行的螺栓。
出于某些原因,多次调用了喷口的nexttuple()。
知道为什么吗?
代码:
喷口:
public class CommitFeedListener extends BaseRichSpout {
private SpoutOutputCollector outputCollector;
private List<String> commits;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("commit"));
}
@Override
public void open(Map configMap,
TopologyContext context,
SpoutOutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
**//that method is invoked more than once**
@Override
public void nextTuple() {
outputCollector.emit(new Values("testValue"));
}
}
螺栓:
public class EmailExtractor extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("email"));
}
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String commit = tuple.getStringByField("commit");
System.out.println(commit);
}
}
运行配置:
public class LocalTopologyRunner {
private static final int TEN_MINUTES = 600000;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
builder
.setBolt("email-extractor", new EmailExtractor())
.shuffleGrouping("commit-feed-listener");
Config config = new Config();
config.setDebug(true);
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("github-commit-count-topology",
config,
topology);
Utils.sleep(TEN_MINUTES);
cluster.killTopology("github-commit-count");
cluster.shutdown();
}
}
谢谢大家,雷。
2条答案
按热度按时间xqnpmsa81#
创建一些标志,必要时设置它怎么样?
rkkpypqq2#
nexttuple()是设计在无限循环中调用的。例如,使用对外部资源(数据库、流、io等)的脏检查是这样做的。
如果在nexttuple()中无事可做,则应休眠一段时间以防止backtype.storm.utils.utils对cpu进行垃圾邮件处理
storm是一个实时处理架构,因此它确实是正确的行为。检查一些样品,看看如何实施喷口根据您的需要。