rest调用的风暴转换(async,future,netty)

6jjcrrmo  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(170)

假设storm每秒接收1000条tweet,在这个过程中,它需要将它们归类为垃圾邮件。我有一个集群,例如20台机器,通过restapi提供“分类”微服务,它们可以提供每秒10k tweets的最大吞吐量,延迟为3秒。这意味着在最坏的情况下,我可能会有3万条推特在飞,这没关系。我猜要从storm中使用此服务,实现将如下所示:

public static class RestBolt implements IRichBolt {
    ...
    @Override
    public void execute(Tuple tuple) {
        String classes = (new Post('http://my.classifier.com', data = tuple.getString(0)));
        _collector.emit(tuple, new Values(classes));
        _collector.ack(tuple);
    }
    ...
}

topologyBuilder.setBolt("rest-bolt", new RestBolt(), 30000);

现在,考虑到这个api,我猜storm除了启动30k线程之外别无选择,这可能会很糟糕。我在源代码中看到storm使用netty,我猜它可以通过使用storm调用更有效地支持此操作。。。如果存在虚构的美丽的netty、storm和java api,则看起来像这样:

public static class RestBolt implements IRichBolt {
    ...
    @Override
    public void execute(Tuple tuple) {
        Future<String> classes = (new AsyncPost('http://my.classifier.com', data = tuple.getString(0)));
        _collector.emit(tuple, new FutureValues(classes));
        _collector.ack(tuple);
    }
    ...
}

有没有一种方法可以在storm中使用很少的线程来使用异步调用来获得巨大的可伸缩性?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题