java Spring 网络流量|如何等待一系列Monos并行执行完毕

r3i60tvu  于 2023-01-01  发布在  Java
关注(0)|答案(1)|浏览(152)

我想向一个REST端点并行发送n个请求,我想确保这些请求在不同的线程中执行以提高性能,并且需要等待所有n个请求完成。
我能想到的唯一方法是使用CountDownLatch,如下所示(请检查main()方法,这是可测试的代码):

public static void main(String args[]) throws Exception {
            
            int n = 10; //n is dynamic during runtime
    
            final CountDownLatch waitForNRequests = new CountDownLatch(n);
            //send n requests
            for (int i =0;i<n;i++) {
                var r = testRestCall(""+i);
                r.publishOn(Schedulers.parallel()).subscribe(res -> {
                    System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
                    
                    waitForNRequests.countDown();
                });
            }
    
    
            waitForNRequests.await(); //wait till all n requests finish before goto the next line
    
            System.out.println("All n requests finished");
            Thread.sleep(10000); 
        }

 

public static Mono<ResponseEntity<Map>> testRestCall(String id) {
    
            WebClient client = WebClient.create("https://reqres.in/api");
    
            JSONObject request = new JSONObject();
            request.put("name", "user"+ id);
            request.put("job", "leader");
    
            var res = client.post().uri("/users")
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(BodyInserters.fromValue(request.toString()))
    
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .toEntity(Map.class)
                    .onErrorReturn(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build());
            return res;
    
        }

这看起来不太好,我相信有一个优雅的解决方案,不使用闩锁..等我尝试了以下方法,但我不知道如何解决以下问题

  1. flux.merge(),contact()导致在单个线程中执行所有n个请求
    1.如何等待n-requests完成执行(fork-join)?
List<Mono<ResponseEntity<Map>>> lst = new ArrayList<>();
int n = 10; //n is dynamic during runtime
for (int i =0;i<n;i++) {
    var r = testRestCall(""+i);
    lst.add(r);
}

var t= Flux.fromIterable(lst).flatMap(Function.identity()); //tried merge() contact() as well
t.publishOn(Schedulers.parallel()).subscribe(res -> {
            System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
            ///??? all requests execute in a single thread.How to parallelize ?
        });

//???How to wait till all n requests finish before goto the next line
System.out.println("All n requests finished");

Thread.sleep(10000);
    • 更新日期:**

我找到了Flux订阅者在同一个线程中运行的原因,我需要创建一个ParallelFlux。所以正确的顺序应该是:

var t= Flux.fromIterable(lst).flatMap(Function.identity()); 
t.parallel()
 .runOn(Schedulers.parallel())
 .subscribe(res -> {
            System.out.println(">>>>>>> Thread: " + Thread.currentThread().getName() + " response:" +res.getBody());
            ///??? all requests execute in a single thread.How to parallelize ?
        });

Ref: https://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux

jei2mxaa

jei2mxaa1#

在React式中,您考虑的不是线程,而是并发性。
Reactor使用Schedulers抽象在少量线程上执行非阻塞/异步任务以执行任务。Schedulers的职责与ExecutorService非常相似。默认情况下,对于并行计划程序,线程数等于CPU内核数,但可以通过“reactor.schedulers.defaultPoolSize”系统属性控制。
在您的示例中,最好使用Flux,然后并行处理元素以控制并发性,而不是创建多个Mono然后合并它们。

Flux.range(1, 10)
    .flatMap(this::testRestCall)

默认情况下,flatMap将处理Queues.SMALL_BUFFER_SIZE = 256数量的运行中内部序列。
如果要按顺序处理,可以控制并发性flatMap(item -> process(item), concurrency)或使用concatMap运算符。有关详细信息,请查看flatMap(...,int concurrency,int prefetch)。

Flux.range(1, 10)
    .flatMap(i -> testRestCall(i), 5)

以下测试显示调用在不同的线程中执行

@Test
void testParallel() {
    var flow = Flux.range(1, 10)
            .flatMap(i -> testRestCall(i))
            .log()
            .then(Mono.just("complete"));

    StepVerifier.create(flow)
            .expectNext("complete")
            .verifyComplete();

}

结果日志

2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-4] reactor.Mono.FlatMap.3                   : | onComplete()
2022-12-30 21:31:25.170  INFO 43383 --- [ctor-http-nio-3] reactor.Mono.FlatMap.2                   : | onComplete()
2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-2] reactor.Mono.FlatMap.1                   : | onComplete()
2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-8] reactor.Mono.FlatMap.7                   : | onComplete()
2022-12-30 21:31:25.169  INFO 43383 --- [tor-http-nio-11] reactor.Mono.FlatMap.10                  : | onComplete()
2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-7] reactor.Mono.FlatMap.6                   : | onComplete()
2022-12-30 21:31:25.169  INFO 43383 --- [ctor-http-nio-9] reactor.Mono.FlatMap.8                   : | onComplete()
2022-12-30 21:31:25.170  INFO 43383 --- [ctor-http-nio-6] reactor.Mono.FlatMap.5                   : | onComplete()
2022-12-30 21:31:25.378  INFO 43383 --- [ctor-http-nio-5] reactor.Mono.FlatMap.4                   : | onComplete()

相关问题