reactor.core.publisher.Flux.onErrorResume()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(544)

本文整理了Java中reactor.core.publisher.Flux.onErrorResume()方法的一些代码示例,展示了Flux.onErrorResume()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onErrorResume()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onErrorResume

Flux.onErrorResume介绍

[英]Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.
[中]当出现与给定类型匹配的错误时,订阅回退发布服务器,使用函数根据错误选择回退。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform any error emitted by this {@link Flux} by synchronously applying a function to it.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorMapForFlux.svg" alt="">
 *
 * @param mapper the error transforming {@link Function}
 *
 * @return a {@link Flux} that transforms source errors to other errors
 */
public final Flux<T> onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) {
  return onErrorResume(e -> Mono.error(mapper.apply(e)));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Simply emit a captured fallback value when any error is observed on this {@link Flux}.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorReturnForFlux.svg" alt="">
 *
 * @param fallbackValue the value to emit if an error occurs
 *
 * @return a new falling back {@link Flux}
 */
public final Flux<T> onErrorReturn(T fallbackValue) {
  return onErrorResume(t -> just(fallbackValue));
}

代码示例来源:origin: spring-projects/spring-framework

private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) {
  return message.getBody()
      .map(buffer -> {
        DataBufferUtils.release(buffer);
        throw new ReadCancellationException();
      })
      .onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
      .then();
}

代码示例来源:origin: reactor/reactor-core

/**
 * Simply emit a captured fallback value when an error matching the given predicate is
 * observed on this {@link Flux}.
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorReturnForFlux.svg" alt="">
 *
 * @param predicate the error predicate to match
 * @param fallbackValue the value to emit if an error occurs that matches the predicate
 *
 * @return a new falling back {@link Flux}
 */
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) {
  return onErrorResume(predicate, t -> just(fallbackValue));
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

/**
 * Customization hook on the actual execution result {@link Publisher}. <br />
 *
 * @param request the already prepared {@link SearchRequest} ready to be executed.
 * @return a {@link Flux} emitting the result of the operation.
 */
protected Flux<SearchHit> doFind(SearchRequest request) {
  if (QUERY_LOGGER.isDebugEnabled()) {
    QUERY_LOGGER.debug("Executing doFind: {}", request);
  }
  return Flux.from(execute(client -> client.search(request))) //
      .onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

/**
 * Customization hook on the actual execution result {@link Publisher}. <br />
 *
 * @param request the already prepared {@link SearchRequest} ready to be executed.
 * @return a {@link Flux} emitting the result of the operation.
 */
protected Flux<SearchHit> doScan(SearchRequest request) {
  if (QUERY_LOGGER.isDebugEnabled()) {
    QUERY_LOGGER.debug("Executing doScan: {}", request);
  }
  return Flux.from(execute(client -> client.scroll(request))) //
      .onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@SuppressWarnings("unchecked")
private <T> Mono<T> consumeAndCancel() {
  return (Mono<T>) this.response.getBody()
      .map(buffer -> {
        DataBufferUtils.release(buffer);
        throw new ReadCancellationException();
      })
      .onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
      .then();
}

代码示例来源:origin: resilience4j/resilience4j

default Flux<? super O> onErrorResume(Flux<? super O> flux) {
  return flux.onErrorResume(t -> {
    O fallbackValue;
    try {
      fallbackValue = apply(t);
    } catch (Exception e) {
      return Flux.error(e);
    }
    return Flux.just(fallbackValue);
  });
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

@Override
public Mono<Boolean> ping(HttpHeaders headers) {
  return sendRequest(new MainRequest(), RequestCreator.ping(), RawActionResponse.class, headers) //
      .map(response -> response.statusCode().is2xxSuccessful()) //
      .onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return session.send(Flux
        .error(new Throwable())
        .onErrorResume(ex -> session.close(CloseStatus.GOING_AWAY)) // SPR-17306 (nested close)
        .cast(WebSocketMessage.class));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorFiltered() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(e -> e.getMessage()
                                            .equals("forced failure"),
      v -> Mono.just(2))
                                .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onErrorResumeErrorPredicateNot() {
  StepVerifier.create(Flux.<Integer>error(new TestException())
      .onErrorResume(RuntimeException.class, e -> Mono.just(1)))
      .verifyError(TestException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void switchFromIterableError() {
  StepVerifier.create(Flux.range(1, 1000)
              .map(t -> {
                if (t == 3) {
                  throw new RuntimeException("test");
                }
                return t;
              })
              .onErrorResume(e -> Flux.range(9999, 4)))
        .expectNext(1, 2, 9999, 10000, 10001, 10002)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextFactoryThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(v -> {
    throw new RuntimeException("forced failure 2");
  })
                                .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorWith(e -> Assert.assertTrue(e.getMessage()
                        .contains("forced failure 2")));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onErrorResumeErrorPredicate() {
  StepVerifier.create(Flux.<Integer>error(new TestException())
      .onErrorResume(TestException.class, e -> Mono.just(1)))
      .expectNext(1)
      .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextFactoryReturnsNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(v -> null)
                                .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>error(new RuntimeException("forced failure")).onErrorResume(v -> Flux.range(
      11,
      10))
                                .subscribe(ts);
  ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorPropagated() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Exception exception = new NullPointerException("forced failure");
  Flux.<Integer>error(exception).onErrorResume(v -> {
    throw Exceptions.propagate(v);
  })
                 .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertErrorWith(e -> Assert.assertSame(exception, e));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .onErrorResume(v -> Flux.range(11, 10))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorHandlingOnErrorResume() {
  Flux<String> flux =
      Flux.just("key1", "key2")
        .flatMap(k ->
            callExternalService(k) // <1>
                .onErrorResume(e -> getFromCache(k)) // <2>
        );
  StepVerifier.create(flux)
        .expectNext("value1", "outdatedkey2")
        .verifyComplete();
}

相关文章

Flux类方法