Spring reactor提供了blocklast(),如果我想保持同步并阻塞直到所有元素都完成
但是如果我想继续一点,然后阻塞,直到所有元素都完成了呢?
(我不想使用isDisposed执行忙碌等待)
我需要自己用onComplete触发的信号来做吗?还是有更好的内置API?
//reactor provides blocklast if I want to be synchronous and block until all elements are done
//Integer data = Flux.range(1,10).delayElements(Duration.ofSeconds(1)).doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName())).blockLast();
//but what if I want to continue a bit and then block
//do I need to do it myself like this? is there a better way?
Object signal = new Object();
Flux.range(1,10).delayElements(Duration.ofSeconds(1)).doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName())).doOnComplete(()->{synchronized(signal) { signal.notify();}}).subscribe();
// do some other work here, then wait for done
synchronized (signal) {
signal.wait();
}
System.out.println("All done");
4条答案
按热度按时间izj3ouym1#
遗憾的是,没有内置的API来实现这个目的,但是有更好的方法来实现这个目的。如果Flux在你完成其他工作之前完成,你的
Object.wait
将无限期地挂起。你可以用多种方法来解决这个问题:使用第二个发布服务器监视完成情况
这里我使用
Sinks.Empty
创建了第二个发布者,它的唯一目的是在最初的Flux完成时发出onComplete信号。我们将Sinks.Empty.emitEmpty()
触发器放入Flux.doOnComplete()
挂钩,然后使用.asMono().block()
阻塞。如果您晚订阅,Mono将立即完成:将您的其他作品放到其他Publisher中
或者,您可以使用
Mono.fromRunnable
将其他工作放在第二个发布服务器中,然后使用Flux.merge
同时订阅这两个发布服务器。Flux.merge
将从这两个发布服务器发出所有项目,并且仅当这两个发布服务器都完成时才完成。由于Mono.fromRunnable
不发出任何项目,因此生成的Flux将仅包含原始Flux的项目。请注意,
Flux.merge
会将两个“出版者”放在相同的排程器上,如果otherWork不是异步的,或原始的Flux永远无法完成,就会造成问题。您可以将不同的排程器指派给每个“出版者”来解决这个问题。xkrw2x1b2#
我会简化@帕特里克Hooijer的答案,并使用
CountDownLatch
,这是并发库的完美元素,它在值更改为零时等待。您只需调用.await(),它就等待Flux在
doOnComplete
方法中调用countDown
。遗憾的是,没有内置的API用于此目的
这是我的基本行为,因此,如果主线程即将关闭,我们将不必等待任何结果,因此,通量可能比仅仅抛出数字更耗时,在这种情况下,我们可能无法关闭主线程。
当然,这取决于创建者对该库的愿景。
szqfcxe23#
正如我在你的问题中所看到的,你想要一些其他的api来代替blockLast,对吗?据我所知,你必须自己实现这个。没有内置的api来实现这个目的。
aelbi1ox4#