nexttuple()在storm上使用baserichspout调用无限次

q1qsirdb  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(300)

我实现了一个简单的storm拓扑,它有一个喷口和一个在本地集群模式下运行的螺栓。
出于某些原因,多次调用了喷口的nexttuple()。
知道为什么吗?
代码:
喷口:

public class CommitFeedListener extends BaseRichSpout {
    private SpoutOutputCollector outputCollector;
    private List<String> commits;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("commit"));
    }

    @Override
    public void open(Map configMap,
                     TopologyContext context,
                     SpoutOutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

  **//that method is invoked more than once**
    @Override
    public void nextTuple() {

            outputCollector.emit(new Values("testValue"));

    }
}

螺栓:

public class EmailExtractor extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("email"));
    }
    @Override
    public void execute(Tuple tuple,
                        BasicOutputCollector outputCollector) {
        String commit = tuple.getStringByField("commit");
        System.out.println(commit);        
    }  
}

运行配置:

public class LocalTopologyRunner {
    private static final int TEN_MINUTES = 600000;
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("commit-feed-listener", new CommitFeedListener());
                builder
        .setBolt("email-extractor", new EmailExtractor())
                .shuffleGrouping("commit-feed-listener");
        Config config = new Config();
        config.setDebug(true);
        StormTopology topology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("github-commit-count-topology",
                config,
                topology);
        Utils.sleep(TEN_MINUTES);
        cluster.killTopology("github-commit-count");
        cluster.shutdown();
    }
}

谢谢大家,雷。

xqnpmsa8

xqnpmsa81#

创建一些标志,必要时设置它怎么样?

if (completed) {
    try {
        Utils.sleep(pollIntervalInMilliseconds);
    } catch (InterruptedException e) {
        // Do nothing
    }
    return;
}
rkkpypqq

rkkpypqq2#

nexttuple()是设计在无限循环中调用的。例如,使用对外部资源(数据库、流、io等)的脏检查是这样做的。
如果在nexttuple()中无事可做,则应休眠一段时间以防止backtype.storm.utils.utils对cpu进行垃圾邮件处理

Utils.sleep(pollIntervalInMilliseconds);

storm是一个实时处理架构,因此它确实是正确的行为。检查一些样品,看看如何实施喷口根据您的需要。

相关问题