java reactor retrywhen和超时

zfycwa2u  于 2021-09-13  发布在  Java
关注(0)|答案(0)|浏览(290)
Flux<Integer> flux2 = Flux.generate(AtomicInteger::new, (atomicInteger, synchronousSink) -> {
            if (atomicInteger.get() == 10) {
                synchronousSink.complete();
                return atomicInteger;
            }
            synchronousSink.next(atomicInteger.getAndIncrement());

            return atomicInteger;
        }).cast(Integer.class)
                .map(e -> {
                    if (e != 3) return e;
                    else {
                        try {
                            Thread.sleep(510L);
                        } catch (InterruptedException interruptedException) {
                            throw new RuntimeException(interruptedException);
                        }
                        System.out.println("sleeping");
                        return e;
                    }
                })
                .timeout(Duration.ofMillis(400L))
                .doOnError(System.out::println)
                .retryWhen(Retry.max(2).transientErrors(false));

出于某种原因,这段代码在第二次重试时不会触发超时错误,可以很好地处理诸如flux.just(1,2,3,4)之类的静态数据,但在第一次重试后生成器不工作。

15:15:45.518 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
0
1
2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
0
1
2
sleeping
15:15:46.129 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
sleeping

15:15:46.549 [parallel-3] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
0
1
2
sleeping
3
4
5
6
7
8
9

p、 s在特定线程上进行subcribing后,由于某种原因,它起了作用。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题