本文整理了Java中reactor.core.publisher.Flux.onErrorResume()
方法的一些代码示例,展示了Flux.onErrorResume()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onErrorResume()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onErrorResume
[英]Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.
[中]当出现与给定类型匹配的错误时,订阅回退发布服务器,使用函数根据错误选择回退。
代码示例来源:origin: reactor/reactor-core
/**
* Transform any error emitted by this {@link Flux} by synchronously applying a function to it.
* <p>
* <img class="marble" src="doc-files/marbles/onErrorMapForFlux.svg" alt="">
*
* @param mapper the error transforming {@link Function}
*
* @return a {@link Flux} that transforms source errors to other errors
*/
public final Flux<T> onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) {
return onErrorResume(e -> Mono.error(mapper.apply(e)));
}
代码示例来源:origin: reactor/reactor-core
/**
* Simply emit a captured fallback value when any error is observed on this {@link Flux}.
* <p>
* <img class="marble" src="doc-files/marbles/onErrorReturnForFlux.svg" alt="">
*
* @param fallbackValue the value to emit if an error occurs
*
* @return a new falling back {@link Flux}
*/
public final Flux<T> onErrorReturn(T fallbackValue) {
return onErrorResume(t -> just(fallbackValue));
}
代码示例来源:origin: spring-projects/spring-framework
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) {
return message.getBody()
.map(buffer -> {
DataBufferUtils.release(buffer);
throw new ReadCancellationException();
})
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
.then();
}
代码示例来源:origin: reactor/reactor-core
/**
* Simply emit a captured fallback value when an error matching the given predicate is
* observed on this {@link Flux}.
* <p>
* <img class="marble" src="doc-files/marbles/onErrorReturnForFlux.svg" alt="">
*
* @param predicate the error predicate to match
* @param fallbackValue the value to emit if an error occurs that matches the predicate
*
* @return a new falling back {@link Flux}
*/
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) {
return onErrorResume(predicate, t -> just(fallbackValue));
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<SearchHit> doFind(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doFind: {}", request);
}
return Flux.from(execute(client -> client.search(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<SearchHit> doScan(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doScan: {}", request);
}
return Flux.from(execute(client -> client.scroll(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@SuppressWarnings("unchecked")
private <T> Mono<T> consumeAndCancel() {
return (Mono<T>) this.response.getBody()
.map(buffer -> {
DataBufferUtils.release(buffer);
throw new ReadCancellationException();
})
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
.then();
}
代码示例来源:origin: resilience4j/resilience4j
default Flux<? super O> onErrorResume(Flux<? super O> flux) {
return flux.onErrorResume(t -> {
O fallbackValue;
try {
fallbackValue = apply(t);
} catch (Exception e) {
return Flux.error(e);
}
return Flux.just(fallbackValue);
});
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<Boolean> ping(HttpHeaders headers) {
return sendRequest(new MainRequest(), RequestCreator.ping(), RawActionResponse.class, headers) //
.map(response -> response.statusCode().is2xxSuccessful()) //
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(Flux
.error(new Throwable())
.onErrorResume(ex -> session.close(CloseStatus.GOING_AWAY)) // SPR-17306 (nested close)
.cast(WebSocketMessage.class));
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorFiltered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(e -> e.getMessage()
.equals("forced failure"),
v -> Mono.just(2))
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onErrorResumeErrorPredicateNot() {
StepVerifier.create(Flux.<Integer>error(new TestException())
.onErrorResume(RuntimeException.class, e -> Mono.just(1)))
.verifyError(TestException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void switchFromIterableError() {
StepVerifier.create(Flux.range(1, 1000)
.map(t -> {
if (t == 3) {
throw new RuntimeException("test");
}
return t;
})
.onErrorResume(e -> Flux.range(9999, 4)))
.expectNext(1, 2, 9999, 10000, 10001, 10002)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextFactoryThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(v -> {
throw new RuntimeException("forced failure 2");
})
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorWith(e -> Assert.assertTrue(e.getMessage()
.contains("forced failure 2")));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onErrorResumeErrorPredicate() {
StepVerifier.create(Flux.<Integer>error(new TestException())
.onErrorResume(TestException.class, e -> Mono.just(1)))
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextFactoryReturnsNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(v -> null)
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(NullPointerException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(v -> Flux.range(
11,
10))
.subscribe(ts);
ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorPropagated() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Exception exception = new NullPointerException("forced failure");
Flux.<Integer>error(exception).onErrorResume(v -> {
throw Exceptions.propagate(v);
})
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertErrorWith(e -> Assert.assertSame(exception, e));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.onErrorResume(v -> Flux.range(11, 10))
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorHandlingOnErrorResume() {
Flux<String> flux =
Flux.just("key1", "key2")
.flatMap(k ->
callExternalService(k) // <1>
.onErrorResume(e -> getFromCache(k)) // <2>
);
StepVerifier.create(flux)
.expectNext("value1", "outdatedkey2")
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!