本文整理了Java中io.reactivex.Observable.doOnError()
方法的一些代码示例,展示了Observable.doOnError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnError()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnError
[英]Modifies the source ObservableSource so that it invokes an action if it calls onError.
In case the onError action throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError.
Scheduler: doOnError does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onError时调用操作。
如果OneRor操作抛出,下游将收到一个复合异常,其中包含原始异常和OneRor抛出的异常。
计划程序:默认情况下,doOnError不会在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnErrorNull() {
just1.doOnError(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMapWithError() {
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
Observable<String> m = w.map(new Function<String, String>() {
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t1) {
t1.printStackTrace();
}
});
m.subscribe(stringObserver);
verify(stringObserver, times(1)).onNext("one");
verify(stringObserver, never()).onNext("two");
verify(stringObserver, never()).onNext("three");
verify(stringObserver, never()).onComplete();
verify(stringObserver, times(1)).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnErrorCalledOnScheduler() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
Observable.<String>error(new Exception())
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
thread.set(Thread.currentThread());
latch.countDown();
}
})
.onErrorResumeNext(Observable.<String>empty())
.subscribe();
latch.await();
assertNotEquals(Thread.currentThread(), thread.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDoOnError() {
final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
Throwable t = null;
try {
Observable.<String> error(new RuntimeException("an error"))
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable v) {
r.set(v);
}
}).blockingSingle();
fail("expected exception, not a return value");
} catch (Throwable e) {
t = e;
}
assertNotNull(t);
assertEquals(t, r.get());
}
代码示例来源:origin: ReactiveX/RxJava
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();
代码示例来源:origin: ReactiveX/RxJava
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorThrows() {
TestObserver<Object> to = TestObserver.create();
Observable.error(new TestException())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) {
throw new TestException();
}
}).subscribe(to);
to.assertNoValues();
to.assertNotComplete();
to.assertError(CompositeException.class);
CompositeException ex = (CompositeException)to.errors().get(0);
List<Throwable> exceptions = ex.getExceptions();
assertEquals(2, exceptions.size());
Assert.assertTrue(exceptions.get(0) instanceof TestException);
Assert.assertTrue(exceptions.get(1) instanceof TestException);
}
代码示例来源:origin: ReactiveX/RxJava
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t1) {
Thread.sleep(100);
interval
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t1) {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
final Callable<Resource> resourceFactory = createResourceFactory(events);
final Consumer<Throwable> onError = createOnErrorAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> apply(Resource resource) {
return Observable.fromArray(resource.getTextFromWeb().split(" "))
.concatWith(Observable.<String>error(new RuntimeException()));
}
};
Observer<String> observer = TestHelper.mockObserver();
Observable<String> o = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnDispose(unsub)
.doOnError(onError);
o.safeSubscribe(observer);
assertEquals(Arrays.asList("error", /* "unsub",*/ "disposed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeError() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Consumer<Throwable> onError = createOnErrorAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> apply(Resource resource) {
return Observable.fromArray(resource.getTextFromWeb().split(" "))
.concatWith(Observable.<String>error(new RuntimeException()));
}
};
Observer<String> observer = TestHelper.mockObserver();
Observable<String> o = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnDispose(unsub)
.doOnError(onError);
o.safeSubscribe(observer);
assertEquals(Arrays.asList("disposed", "error" /*, "unsub"*/), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorOnErrorCrashConditional() {
TestObserver<Object> to = Observable.error(new TestException("Outer"))
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
throw new TestException("Inner");
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(CompositeException.class);
List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class, "Outer");
TestHelper.assertError(errors, 1, TestException.class, "Inner");
}
代码示例来源:origin: apollographql/apollo-android
@Test public void httpException() throws Exception {
server.enqueue(new MockResponse().setResponseCode(401).setBody("Unauthorized request!"));
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final AtomicReference<String> errorResponse = new AtomicReference<>();
Rx2Apollo
.from(apolloClient.query(emptyQuery))
.doOnError(new Consumer<Throwable>() {
@Override public void accept(Throwable throwable) throws Exception {
errorRef.set(throwable);
errorResponse.set(((ApolloHttpException) throwable).rawResponse().body().string());
}
})
.test()
.awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS)
.assertError(ApolloHttpException.class);
ApolloHttpException e = (ApolloHttpException) errorRef.get();
assertThat(e.code()).isEqualTo(401);
assertThat(e.message()).isEqualTo("Client Error");
assertThat(errorResponse.get()).isEqualTo("Unauthorized request!");
assertThat(e.getMessage()).isEqualTo("HTTP 401 Client Error");
}
代码示例来源:origin: Polidea/RxAndroidBle
.doOnError(throwable -> showNotification("Could not parse input: " + throwable))
.retryWhen(errorNotificationHandler -> errorNotificationHandler),
sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setup_notification)),
代码示例来源:origin: Piasy/RxAndroidAudio
/**
* play audio from local file. should be scheduled in IO thread.
*/
public Observable<Boolean> play(@NonNull final PlayConfig config) {
if (!config.isArgumentValid()) {
return Observable.error(new IllegalArgumentException(""));
}
return Observable.<Boolean>create(emitter -> {
MediaPlayer player = create(config);
setMediaPlayerListener(player, emitter);
player.setVolume(config.mLeftVolume, config.mRightVolume);
player.setAudioStreamType(config.mStreamType);
player.setLooping(config.mLooping);
if (config.needPrepare()) {
player.prepare();
}
player.start();
mPlayer = player;
emitter.onNext(true);
}).doOnError(e -> stopPlay());
}
代码示例来源:origin: Piasy/RxAndroidAudio
/**
* prepare audio from local file. should be scheduled in IO thread.
*/
public Observable<Boolean> prepare(@NonNull final PlayConfig config) {
if (!config.isArgumentValid() || !config.isLocalSource()) {
return Observable.error(new IllegalArgumentException(""));
}
return Observable.<Boolean>create(emitter -> {
MediaPlayer player = create(config);
setMediaPlayerListener(player, emitter);
player.setVolume(config.mLeftVolume, config.mRightVolume);
player.setAudioStreamType(config.mStreamType);
player.setLooping(config.mLooping);
if (config.needPrepare()) {
player.prepare();
}
mPlayer = player;
emitter.onNext(true);
}).doOnError(e -> stopPlay());
}
代码示例来源:origin: chat-sdk/chat-sdk-android
/**
/* Convenience method to save the message to the database then pass it to the token network adapter
* send method so it can be sent via the network
*/
public Observable<MessageSendProgress> implSendMessage(final Message message) {
return Observable.create((ObservableOnSubscribe<MessageSendProgress>) e -> {
message.update();
message.getThread().update();
if (ChatSDK.encryption() != null) {
ChatSDK.encryption().encrypt(message);
}
e.onNext(new MessageSendProgress(message));
e.onComplete();
}).concatWith(sendMessage(message))
.subscribeOn(Schedulers.single()).doOnComplete(() -> {
message.setMessageStatus(MessageSendStatus.Sent);
message.update();
}).doOnError(throwable -> {
message.setMessageStatus(MessageSendStatus.Failed);
message.update();
});
}
代码示例来源:origin: io.reactivex/rxjavafx
/**
* Performs a given action on a Throwable on the FX thread in the event of an onError
* @param onError
* @param <T>
*/
public static <T> ObservableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
return obs -> obs.doOnError(e -> runOnFx(e,onError));
}
代码示例来源:origin: io.reactivex.rxjava2/rxjavafx
/**
* Performs a given action on a Throwable on the FX thread in the event of an onError
* @param onError
* @param <T>
*/
public static <T> ObservableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
return obs -> obs.doOnError(e -> runOnFx(e,onError));
}
代码示例来源:origin: spotify/mobius
@Override
public Observable<E> apply(Observable<F> effects) {
return effects
.ofType(effectClass)
.compose(effectHandler)
.doOnError(onErrorFunction.apply(effectHandler));
}
});
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() {
Observable.just(1)
.flatMap(v -> single(v)
.toObservable()
.doOnError(w -> System.out.println("Error2 " + w))
)
.subscribe(v -> System.out.println(v), e -> System.out.println("Error " + e));
}
内容来源于网络,如有侵权,请联系作者删除!