Flink1.6异步io—如何在使用rest服务调用丰富流时提高吞吐量?

rekjcdws  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(232)

我目前使用的是flink 1.6版,asyncio遇到了一个问题,性能没有达到我的预期。
我确信我在执行过程中犯了一些错误,因此任何建议/建议都将不胜感激。
问题摘要-我正在使用一个ID流。对于每一个id,我都需要呼叫rest服务。我实现了一个richasyncfunction,它执行异步rest调用。
下面是相关的代码方法和asyncinvoke方法

// these are initialized in the open method
ExecutorService executorService = 
ExecutorService.newFixedThreadPool(n);
CloseableHttpAsyncClient client = ...
Gson gson = ...

public void asyncInvoke(String key, final ResultFuture<Item> resultFuture) throws Exception {

    executorService.submit(new Runnable() {

        client.execute(new HttpGet(new URI("http://myservice/" + key)), new FutureCallback<HttpResponse>() {
             @Override
                public void completed(final HttpResponse response) {
                    System.out.println("completed successfully");
                    Item item = gson.fromJson(EntityUtils.toString(response.getEntity), Item.class);
                    resultFuture.complete(Collections.singleton(item));
                }
        });
    });

}
通过上述实现,我尝试了:-
提高浓缩操作的并行性
增加executor服务中的线程数
使用apachehttpasync客户端,我尝试调整连接管理器设置setdefaultmaxperroute和setmaxtotal。
我一直获得大约100个请求/秒的吞吐量。该服务每秒可处理5公里以上的数据。我做错了什么,如何改进?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题