为什么这个通量不在多个线程中执行?

brjng4g3  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(283)

我有以下例子:

Flux.just(1,2,3,4,5,6,7,8)
    .flatMap(integer -> {
                 System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
                 return Mono.just(integer);
             }, 5)
    .repeat()
    .subscribeOn(Schedulers.parallel())
    .subscribe();

日志如下:

val:4, thread:14
val:5, thread:14
val:6, thread:14
val:7, thread:14
val:8, thread:14
val:1, thread:14
val:2, thread:14
val:3, thread:14

为什么到处都是同一根线??如何修改示例,使其在多个线程中执行?

qyyhg6bp

qyyhg6bp1#

如果希望每个重复的通量位于不同的线程上,可以移动 publishOn 以前,像这样:

Flux.just(1,2,3,4,5,6,7,8)
        .publishOn(Schedulers.parallel()) // <- before
        .flatMap(integer -> {
           System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
           return Mono.just(integer);
        }, 5)
        .repeat()
        .subscribe();

现在的输出是这样的:

val:1, thread:20
val:2, thread:20
val:3, thread:20
val:4, thread:20
val:5, thread:20
val:6, thread:20
val:7, thread:20
val:8, thread:20
val:1, thread:13
val:2, thread:13
val:3, thread:13
val:4, thread:13
val:5, thread:13
val:6, thread:13
val:7, thread:13
val:8, thread:13

如果希望每个整数位于不同的线程中,可以执行以下操作:

Flux.just(1,2,3,4,5,6,7,8)
        .publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
        .flatMap(integer -> {
            return Mono.fromCallable(() -> {
                 System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
                 return integer;
            }).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
         })
        .repeat()
        .subscribe();

输出变为:

val:3, thread:16
val:2, thread:15
val:7, thread:20
val:8, thread:13
val:5, thread:18
val:6, thread:19
val:3, thread:17
val:5, thread:19
val:6, thread:20
val:1, thread:15
val:8, thread:14
val:4, thread:18
val:7, thread:13
tyg4sfes

tyg4sfes2#

你需要使用 parallel 操作如下:

Flux.just(1,2,3,4,5,6,7,8)
    .parallel(2) // mention number of threads
    .runOn(Schedulers.parallel())
    .map(integer -> {
             System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
             return integer;
        })   
    .subscribe();

相关问题