订阅或发布不使用reactivecassandracrudrepository

camsedfj  于 2021-06-13  发布在  Cassandra
关注(0)|答案(1)|浏览(451)

我们在springwebflux中使用React式spring数据存储库,我对subscribeon的理解是,它决定了subscribeon之前的操作符将在flux中执行哪个线程池,而publishon决定了订阅将在哪个线程池上执行。然而,在下面的代码中,即使使用publishon和subscribeon,代码也不会在主线程上执行,而是返回到cluster-nio-worker-1。

System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
    personRepository.findAll().log()
            .map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
            .subscribeOn(Schedulers.immediate())
            .publishOn(Schedulers.immediate())
            .subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
                    excp -> excp.printStackTrace(),
                    () -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1

线程[cluster-nio-worker-1,5,main]是什么意思?为什么这些方法调用不使用主线程执行呢。

9rnv2umw

9rnv2umw1#

subscribeon方法使发布者使用给定的线程池来发布值。可能有n个 subscribeOn 管道中的方法。最严厉的一个将生效。 personRepository.findAll().log() 是 Package 器并返回通量。因此,如果它在内部使用任何调度程序,则不能使用subscribeon更改它。例如, interval 方法使用parallel,我不能将其更改为boundedelastic,如图所示。

Flux.interval(Duration.ofSeconds(1))
            .subscribeOn(Schedulers.boundedElastic())
``` `Schedulers.immediate` 只是保持管道执行在同一个线程中。它不是主要的,在你的情况下,它将是 `cluster-nio-worker` 线程池。
我们可以从主线程池切换到任何调度程序线程池。但是我们不能将执行切换回主线程。这不是项目React堆的限制。这应该是java本身的局限性。

相关问题