java 为什么额外的log()会影响用于计算fromCallable()的线程池?

guz6ccqo  于 12个月前  发布在  Java
关注(0)|答案(1)|浏览(89)

我正在玩Project Reactor,面对着不直观的行为。
举个例子:

Mono.fromCallable(() -> calculate())
    //.log()
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(i -> System.out.println("thread: " +   Thread.currentThread().getName()))
      .block();

当我取消注解log()时,calculate()main线程上运行,但当我保留它时,该方法在boundedElastic上执行。为什么log()会改变Mono的行为?

8i9zcol2

8i9zcol21#

如果您查看publishOn方法的实现,您会注意到它有一个特定的情况,即在Callable之后立即调用它。在这种情况下,它返回MonoSubscribeOnCallable,并且在传递给publishOn的调度程序上执行可调用。这就是为什么你会看到这样的行为。
但是,如果在fromCallablepublishOn之间添加.log运算符,则不适用此特殊情况。这是因为this指针现在引用MonoLogFuseable,因此,MonoPublishOn以默认行为返回。

public final Mono<T> publishOn(Scheduler scheduler) {
    if(this instanceof Callable) {
        if (this instanceof Fuseable.ScalarCallable) {
            try {
                T value = block();
                return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
            }
            catch (Throwable t) {
                //leave MonoSubscribeOnCallable defer error
            }
        }
        @SuppressWarnings("unchecked")
        Callable<T> c = (Callable<T>)this;
        return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler));
    }
    return onAssembly(new MonoPublishOn<>(this, scheduler));
}

以下是引入此更改的提交:https://github.com/reactor/reactor-core/commit/41d9dae7256ffc36a07db7a9f5fa4d95182a5ad9
不幸的是,我找不到任何票据参考或解释,以清楚地理解为什么它的工作方式。

相关问题