我想向一个REST端点并行发送n个请求,我想确保这些请求在不同的线程中执行以提高性能,并且需要等待所有n个请求完成。
我能想到的唯一方法是使用CountDownLatch,如下所示(请检查main()方法,这是可测试的代码):
public static void main(String args[]) throws Exception {
int n = 10; //n is dynamic during runtime
final CountDownLatch waitForNRequests = new CountDownLatch(n);
//send n requests
for (int i =0;i<n;i++) {
var r = testRestCall(""+i);
r.publishOn(Schedulers.parallel()).subscribe(res -> {
System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
waitForNRequests.countDown();
});
}
waitForNRequests.await(); //wait till all n requests finish before goto the next line
System.out.println("All n requests finished");
Thread.sleep(10000);
}
public static Mono<ResponseEntity<Map>> testRestCall(String id) {
WebClient client = WebClient.create("https://reqres.in/api");
JSONObject request = new JSONObject();
request.put("name", "user"+ id);
request.put("job", "leader");
var res = client.post().uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(request.toString()))
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(Map.class)
.onErrorReturn(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build());
return res;
}
这看起来不太好,我相信有一个优雅的解决方案,不使用闩锁..等我尝试了以下方法,但我不知道如何解决以下问题:
- flux.merge(),contact()导致在单个线程中执行所有n个请求
1.如何等待n-requests完成执行(fork-join)?
List<Mono<ResponseEntity<Map>>> lst = new ArrayList<>();
int n = 10; //n is dynamic during runtime
for (int i =0;i<n;i++) {
var r = testRestCall(""+i);
lst.add(r);
}
var t= Flux.fromIterable(lst).flatMap(Function.identity()); //tried merge() contact() as well
t.publishOn(Schedulers.parallel()).subscribe(res -> {
System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
///??? all requests execute in a single thread.How to parallelize ?
});
//???How to wait till all n requests finish before goto the next line
System.out.println("All n requests finished");
Thread.sleep(10000);
- 更新日期:**
我找到了Flux订阅者在同一个线程中运行的原因,我需要创建一个ParallelFlux。所以正确的顺序应该是:
var t= Flux.fromIterable(lst).flatMap(Function.identity());
t.parallel()
.runOn(Schedulers.parallel())
.subscribe(res -> {
System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
///??? all requests execute in a single thread.How to parallelize ?
});
Ref: https://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux
1条答案
按热度按时间jei2mxaa1#
在React式中,您考虑的不是线程,而是并发性。
Reactor使用
Schedulers
抽象在少量线程上执行非阻塞/异步任务以执行任务。Schedulers
的职责与ExecutorService
非常相似。默认情况下,对于并行计划程序,线程数等于CPU内核数,但可以通过“reactor.schedulers.defaultPoolSize”系统属性控制。在您的示例中,最好使用
Flux
,然后并行处理元素以控制并发性,而不是创建多个Mono
然后合并它们。默认情况下,flatMap将处理
Queues.SMALL_BUFFER_SIZE = 256
数量的运行中内部序列。如果要按顺序处理,可以控制并发性
flatMap(item -> process(item), concurrency)
或使用concatMap
运算符。有关详细信息,请查看flatMap(...,int concurrency,int prefetch)。以下测试显示调用在不同的线程中执行
结果日志