Spring Reactor Flux,如何订阅和稍后阻止,直到所有完成

whhtz7ly  于 2022-12-02  发布在  Spring
关注(0)|答案(4)|浏览(261)

Spring reactor提供了blocklast(),如果我想保持同步并阻塞直到所有元素都完成
但是如果我想继续一点,然后阻塞,直到所有元素都完成了呢?
(我不想使用isDisposed执行忙碌等待)
我需要自己用onComplete触发的信号来做吗?还是有更好的内置API?

//reactor provides blocklast if I want to be synchronous and block until all elements are done
//Integer data = Flux.range(1,10).delayElements(Duration.ofSeconds(1)).doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName())).blockLast();
 
//but what if I want to continue a bit and then block
//do I need to do it myself like this? is there a better way?
Object signal = new Object(); 
Flux.range(1,10).delayElements(Duration.ofSeconds(1)).doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName())).doOnComplete(()->{synchronized(signal) { signal.notify();}}).subscribe();
// do some other work here, then wait for done
synchronized (signal) {
    signal.wait();
}
System.out.println("All done");
izj3ouym

izj3ouym1#

遗憾的是,没有内置的API来实现这个目的,但是有更好的方法来实现这个目的。如果Flux在你完成其他工作之前完成,你的Object.wait将无限期地挂起。你可以用多种方法来解决这个问题:

使用第二个发布服务器监视完成情况

这里我使用Sinks.Empty创建了第二个发布者,它的唯一目的是在最初的Flux完成时发出onComplete信号。我们将Sinks.Empty.emitEmpty()触发器放入Flux.doOnComplete()挂钩,然后使用.asMono().block()阻塞。如果您晚订阅,Mono将立即完成:

Sinks.Empty<Void> completionSink = Sinks.empty();

Flux.range(1,10)
        .delayElements(Duration.ofSeconds(1))
        .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()))
        .doOnComplete(() -> completionSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))
        .subscribe();

// do some other work here, then wait for done
completionSink.asMono().block();
System.out.println("All done");

将您的其他作品放到其他Publisher中

或者,您可以使用Mono.fromRunnable将其他工作放在第二个发布服务器中,然后使用Flux.merge同时订阅这两个发布服务器。Flux.merge将从这两个发布服务器发出所有项目,并且仅当这两个发布服务器都完成时才完成。由于Mono.fromRunnable不发出任何项目,因此生成的Flux将仅包含原始Flux的项目。

Flux<Integer> flux = Flux.range(1, 10)
        .delayElements(Duration.ofSeconds(1))
        .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()));
Mono<Void> otherWork = Mono.fromRunnable(() -> {
    // do some other work here
});
Flux.merge(flux, otherWork).blockLast();
System.out.println("All done");

请注意,Flux.merge会将两个“出版者”放在相同的排程器上,如果otherWork不是异步的,或原始的Flux永远无法完成,就会造成问题。您可以将不同的排程器指派给每个“出版者”来解决这个问题。

xkrw2x1b

xkrw2x1b2#

我会简化@帕特里克Hooijer的答案,并使用CountDownLatch,这是并发库的完美元素,它在值更改为零时等待。
您只需调用.await(),它就等待Flux在doOnComplete方法中调用countDown

CountDownLatch countDownLatch = new CountDownLatch(1);

    Flux.range(1,10)
            .delayElements(Duration.ofSeconds(1))
            .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()))
            .doOnComplete(countDownLatch::countDown)
            .subscribe();

    countDownLatch.await();

    System.out.println("All done");

遗憾的是,没有内置的API用于此目的
这是我的基本行为,因此,如果主线程即将关闭,我们将不必等待任何结果,因此,通量可能比仅仅抛出数字更耗时,在这种情况下,我们可能无法关闭主线程。
当然,这取决于创建者对该库的愿景。

szqfcxe2

szqfcxe23#

正如我在你的问题中所看到的,你想要一些其他的api来代替blockLast,对吗?据我所知,你必须自己实现这个。没有内置的api来实现这个目的。

aelbi1ox

aelbi1ox4#

Mono.zip(
                        Mono.fromCallable(() -> {
                            //something
                            System.out.println("Completed doing");
                            return 1;
                        }).delaySubscription(Duration.ofSeconds(12)).subscribeOn(Schedulers.boundedElastic()),
                        Flux.range(1, 10)
                                .delayElements(Duration.ofSeconds(1))
                                .doOnNext((Integer next) -> System.out.println(next + " on thread " + Thread.currentThread().getName()))
                                .last()
                )
                .subscribe(__ -> System.out.println("All Done"));

相关问题