Flux<Integer> flux2 = Flux.generate(AtomicInteger::new, (atomicInteger, synchronousSink) -> {
if (atomicInteger.get() == 10) {
synchronousSink.complete();
return atomicInteger;
}
synchronousSink.next(atomicInteger.getAndIncrement());
return atomicInteger;
}).cast(Integer.class)
.map(e -> {
if (e != 3) return e;
else {
try {
Thread.sleep(510L);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
System.out.println("sleeping");
return e;
}
})
.timeout(Duration.ofMillis(400L))
.doOnError(System.out::println)
.retryWhen(Retry.max(2).transientErrors(false));
出于某种原因,这段代码在第二次重试时不会触发超时错误,可以很好地处理诸如flux.just(1,2,3,4)之类的静态数据,但在第一次重试后生成器不工作。
15:15:45.518 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
0
1
2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
0
1
2
sleeping
15:15:46.129 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
sleeping
15:15:46.549 [parallel-3] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
0
1
2
sleeping
3
4
5
6
7
8
9
p、 s在特定线程上进行subcribing后,由于某种原因,它起了作用。
暂无答案!
目前还没有任何答案,快来回答吧!