我目前使用的是flink 1.6版,asyncio遇到了一个问题,性能没有达到我的预期。
我确信我在执行过程中犯了一些错误,因此任何建议/建议都将不胜感激。
问题摘要-我正在使用一个ID流。对于每一个id,我都需要呼叫rest服务。我实现了一个richasyncfunction,它执行异步rest调用。
下面是相关的代码方法和asyncinvoke方法
// these are initialized in the open method
ExecutorService executorService =
ExecutorService.newFixedThreadPool(n);
CloseableHttpAsyncClient client = ...
Gson gson = ...
public void asyncInvoke(String key, final ResultFuture<Item> resultFuture) throws Exception {
executorService.submit(new Runnable() {
client.execute(new HttpGet(new URI("http://myservice/" + key)), new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse response) {
System.out.println("completed successfully");
Item item = gson.fromJson(EntityUtils.toString(response.getEntity), Item.class);
resultFuture.complete(Collections.singleton(item));
}
});
});
}
通过上述实现,我尝试了:-
提高浓缩操作的并行性
增加executor服务中的线程数
使用apachehttpasync客户端,我尝试调整连接管理器设置setdefaultmaxperroute和setmaxtotal。
我一直获得大约100个请求/秒的吞吐量。该服务每秒可处理5公里以上的数据。我做错了什么,如何改进?
暂无答案!
目前还没有任何答案,快来回答吧!