io.reactivex.Observable.doOnError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(165)

本文整理了Java中io.reactivex.Observable.doOnError()方法的一些代码示例,展示了Observable.doOnError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnError

Observable.doOnError介绍

[英]Modifies the source ObservableSource so that it invokes an action if it calls onError.

In case the onError action throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError.

Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onError时调用操作。
如果OneRor操作抛出,下游将收到一个复合异常,其中包含原始异常和OneRor抛出的异常。
计划程序:默认情况下,doOnError不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnErrorNull() {
  just1.doOnError(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMapWithError() {
  Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
  Observable<String> m = w.map(new Function<String, String>() {
    @Override
    public String apply(String s) {
      if ("fail".equals(s)) {
        throw new RuntimeException("Forced Failure");
      }
      return s;
    }
  }).doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable t1) {
      t1.printStackTrace();
    }
  });
  m.subscribe(stringObserver);
  verify(stringObserver, times(1)).onNext("one");
  verify(stringObserver, never()).onNext("two");
  verify(stringObserver, never()).onNext("three");
  verify(stringObserver, never()).onComplete();
  verify(stringObserver, times(1)).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnErrorCalledOnScheduler() throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicReference<Thread> thread = new AtomicReference<Thread>();
  Observable.<String>error(new Exception())
      .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
      .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
          thread.set(Thread.currentThread());
          latch.countDown();
        }
      })
      .onErrorResumeNext(Observable.<String>empty())
      .subscribe();
  latch.await();
  assertNotEquals(Thread.currentThread(), thread.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDoOnError() {
  final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
  Throwable t = null;
  try {
    Observable.<String> error(new RuntimeException("an error"))
    .doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable v) {
        r.set(v);
      }
    }).blockingSingle();
    fail("expected exception, not a return value");
  } catch (Throwable e) {
    t = e;
  }
  assertNotNull(t);
  assertEquals(t, r.get());
}

代码示例来源:origin: ReactiveX/RxJava

.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();

代码示例来源:origin: ReactiveX/RxJava

.doOnError(new Consumer<Throwable>() {
  @Override
  public void accept(Throwable e) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorThrows() {
  TestObserver<Object> to = TestObserver.create();
  Observable.error(new TestException())
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) {
      throw new TestException();
    }
  }).subscribe(to);
  to.assertNoValues();
  to.assertNotComplete();
  to.assertError(CompositeException.class);
  CompositeException ex = (CompositeException)to.errors().get(0);
  List<Throwable> exceptions = ex.getExceptions();
  assertEquals(2, exceptions.size());
  Assert.assertTrue(exceptions.get(0) instanceof TestException);
  Assert.assertTrue(exceptions.get(1) instanceof TestException);
}

代码示例来源:origin: ReactiveX/RxJava

.doOnError(new Consumer<Throwable>() {
      @Override
      public void accept(Throwable t1) {
Thread.sleep(100);
interval
.doOnError(new Consumer<Throwable>() {
  @Override
  public void accept(Throwable t1) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
  final List<String> events = new ArrayList<String>();
  final Callable<Resource> resourceFactory = createResourceFactory(events);
  final Consumer<Throwable> onError = createOnErrorAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
    @Override
    public Observable<String> apply(Resource resource) {
      return Observable.fromArray(resource.getTextFromWeb().split(" "))
          .concatWith(Observable.<String>error(new RuntimeException()));
    }
  };
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> o = Observable.using(resourceFactory, observableFactory,
      new DisposeAction(), false)
  .doOnDispose(unsub)
  .doOnError(onError);
  o.safeSubscribe(observer);
  assertEquals(Arrays.asList("error", /* "unsub",*/ "disposed"), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUsingDisposesEagerlyBeforeError() {
  final List<String> events = new ArrayList<String>();
  Callable<Resource> resourceFactory = createResourceFactory(events);
  final Consumer<Throwable> onError = createOnErrorAction(events);
  final Action unsub = createUnsubAction(events);
  Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
    @Override
    public Observable<String> apply(Resource resource) {
      return Observable.fromArray(resource.getTextFromWeb().split(" "))
          .concatWith(Observable.<String>error(new RuntimeException()));
    }
  };
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> o = Observable.using(resourceFactory, observableFactory,
      new DisposeAction(), true)
  .doOnDispose(unsub)
  .doOnError(onError);
  o.safeSubscribe(observer);
  assertEquals(Arrays.asList("disposed", "error" /*, "unsub"*/), events);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorOnErrorCrashConditional() {
  TestObserver<Object> to = Observable.error(new TestException("Outer"))
  .doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable e) throws Exception {
      throw new TestException("Inner");
    }
  })
  .filter(Functions.alwaysTrue())
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class, "Outer");
  TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

代码示例来源:origin: apollographql/apollo-android

@Test public void httpException() throws Exception {
 server.enqueue(new MockResponse().setResponseCode(401).setBody("Unauthorized request!"));
 final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 final AtomicReference<String> errorResponse = new AtomicReference<>();
 Rx2Apollo
   .from(apolloClient.query(emptyQuery))
   .doOnError(new Consumer<Throwable>() {
    @Override public void accept(Throwable throwable) throws Exception {
     errorRef.set(throwable);
     errorResponse.set(((ApolloHttpException) throwable).rawResponse().body().string());
    }
   })
   .test()
   .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
   .assertError(ApolloHttpException.class);
 ApolloHttpException e = (ApolloHttpException) errorRef.get();
 assertThat(e.code()).isEqualTo(401);
 assertThat(e.message()).isEqualTo("Client Error");
 assertThat(errorResponse.get()).isEqualTo("Unauthorized request!");
 assertThat(e.getMessage()).isEqualTo("HTTP 401 Client Error");
}

代码示例来源:origin: Polidea/RxAndroidBle

.doOnError(throwable -> showNotification("Could not parse input: " + throwable))
    .retryWhen(errorNotificationHandler -> errorNotificationHandler),
sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setup_notification)),

代码示例来源:origin: Piasy/RxAndroidAudio

/**
 * play audio from local file. should be scheduled in IO thread.
 */
public Observable<Boolean> play(@NonNull final PlayConfig config) {
  if (!config.isArgumentValid()) {
    return Observable.error(new IllegalArgumentException(""));
  }
  return Observable.<Boolean>create(emitter -> {
    MediaPlayer player = create(config);
    setMediaPlayerListener(player, emitter);
    player.setVolume(config.mLeftVolume, config.mRightVolume);
    player.setAudioStreamType(config.mStreamType);
    player.setLooping(config.mLooping);
    if (config.needPrepare()) {
      player.prepare();
    }
    player.start();
    mPlayer = player;
    emitter.onNext(true);
  }).doOnError(e -> stopPlay());
}

代码示例来源:origin: Piasy/RxAndroidAudio

/**
 * prepare audio from local file. should be scheduled in IO thread.
 */
public Observable<Boolean> prepare(@NonNull final PlayConfig config) {
  if (!config.isArgumentValid() || !config.isLocalSource()) {
    return Observable.error(new IllegalArgumentException(""));
  }
  return Observable.<Boolean>create(emitter -> {
    MediaPlayer player = create(config);
    setMediaPlayerListener(player, emitter);
    player.setVolume(config.mLeftVolume, config.mRightVolume);
    player.setAudioStreamType(config.mStreamType);
    player.setLooping(config.mLooping);
    if (config.needPrepare()) {
      player.prepare();
    }
    mPlayer = player;
    emitter.onNext(true);
  }).doOnError(e -> stopPlay());
}

代码示例来源:origin: chat-sdk/chat-sdk-android

/**
/* Convenience method to save the message to the database then pass it to the token network adapter
 * send method so it can be sent via the network
 */
public Observable<MessageSendProgress> implSendMessage(final Message message) {
  return Observable.create((ObservableOnSubscribe<MessageSendProgress>) e -> {
    message.update();
    message.getThread().update();
    if (ChatSDK.encryption() != null) {
      ChatSDK.encryption().encrypt(message);
    }
    e.onNext(new MessageSendProgress(message));
    e.onComplete();
  }).concatWith(sendMessage(message))
      .subscribeOn(Schedulers.single()).doOnComplete(() -> {
        message.setMessageStatus(MessageSendStatus.Sent);
        message.update();
      }).doOnError(throwable -> {
        message.setMessageStatus(MessageSendStatus.Failed);
        message.update();
      });
}

代码示例来源:origin: io.reactivex/rxjavafx

/**
 * Performs a given action on a Throwable on the FX thread in the event of an onError
 * @param onError
 * @param <T>
 */
public static <T> ObservableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
  return obs -> obs.doOnError(e -> runOnFx(e,onError));
}

代码示例来源:origin: io.reactivex.rxjava2/rxjavafx

/**
 * Performs a given action on a Throwable on the FX thread in the event of an onError
 * @param onError
 * @param <T>
 */
public static <T> ObservableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
  return obs -> obs.doOnError(e -> runOnFx(e,onError));
}

代码示例来源:origin: spotify/mobius

@Override
 public Observable<E> apply(Observable<F> effects) {
  return effects
    .ofType(effectClass)
    .compose(effectHandler)
    .doOnError(onErrorFunction.apply(effectHandler));
 }
});

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test() {
  Observable.just(1)
  .flatMap(v -> single(v)
      .toObservable()
      .doOnError(w -> System.out.println("Error2 " + w))
  )
  .subscribe(v -> System.out.println(v), e -> System.out.println("Error " + e));
}

相关文章

Observable类方法