本文整理了Java中io.reactivex.Observable.ignoreElements()
方法的一些代码示例,展示了Observable.ignoreElements()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.ignoreElements()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:ignoreElements
[英]Ignores all items emitted by the source ObservableSource and only calls onComplete or onError.
Scheduler: ignoreElements does not operate by default on a particular Scheduler.
[中]忽略源ObservableSource发出的所有项,只调用onComplete或onError。
调度器:默认情况下,ignoreElements不会在特定的调度器上运行。
代码示例来源:origin: jeasonlzy/okhttp-OkGo
@Override
public Completable adapt(Call<T> call, AdapterParam param) {
ObservableResponse<T> observable = new ObservableResponse<>();
return observable.adapt(call, param).ignoreElements();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).ignoreElements());
TestHelper.checkDisposed(Observable.just(1).ignoreElements().toObservable());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
}).toObservable()
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIgnoreElementsObservable() {
Observable<Integer> o = Observable.just(1, 2, 3).ignoreElements().toObservable();
Observer<Object> observer = TestHelper.mockObserver();
o.subscribe(observer);
verify(observer, never()).onNext(any(Integer.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIgnoreElements() {
Completable o = Observable.just(1, 2, 3).ignoreElements();
CompletableObserver observer = TestHelper.mockCompletableObserver();
o.subscribe(observer);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithNonEmpty() {
assertNull(Observable.just(1, 2, 3).ignoreElements().blockingGet());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstreamObservable() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.ignoreElements()
.toObservable()
.subscribe()
.dispose();
assertTrue(unsub.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.ignoreElements()
.subscribe()
.dispose();
assertTrue(unsub.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithNonEmptyObservable() {
assertTrue(Observable.just(1, 2, 3).ignoreElements().toObservable().isEmpty().blockingGet());
}
代码示例来源:origin: square/retrofit
return observable.ignoreElements();
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUpstreamIsProcessedButIgnored() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
Object count = Observable.range(1, num)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}
})
.ignoreElements()
.blockingGet();
assertEquals(num, upstreamCount.get());
assertNull(count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithEmpty() {
assertNull(Observable.empty().ignoreElements().blockingGet());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompletedOkObservable() {
TestObserver<Object> to = new TestObserver<Object>();
Observable.range(1, 10).ignoreElements().toObservable().subscribe(to);
to.assertNoErrors();
to.assertNoValues();
to.assertTerminated();
// FIXME no longer testable
// ts.assertUnsubscribed();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUpstreamIsProcessedButIgnoredObservable() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
long count = Observable.range(1, num)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
upstreamCount.incrementAndGet();
}
})
.ignoreElements()
.toObservable()
.count().blockingGet();
assertEquals(num, upstreamCount.get());
assertEquals(0, count);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testWithEmptyObservable() {
assertTrue(Observable.empty().ignoreElements().toObservable().isEmpty().blockingGet());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorReceivedObservable() {
TestObserver<Object> to = new TestObserver<Object>();
TestException ex = new TestException("boo");
Observable.error(ex).ignoreElements().toObservable().subscribe(to);
to.assertNoValues();
to.assertTerminated();
// FIXME no longer testable
// ts.assertUnsubscribed();
to.assertError(TestException.class);
to.assertErrorMessage("boo");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCompletedOk() {
TestObserver<Object> to = new TestObserver<Object>();
Observable.range(1, 10).ignoreElements().subscribe(to);
to.assertNoErrors();
to.assertNoValues();
to.assertTerminated();
// FIXME no longer testable
// ts.assertUnsubscribed();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorReceived() {
TestObserver<Object> to = new TestObserver<Object>();
TestException ex = new TestException("boo");
Observable.error(ex).ignoreElements().subscribe(to);
to.assertNoValues();
to.assertTerminated();
// FIXME no longer testable
// ts.assertUnsubscribed();
to.assertError(TestException.class);
to.assertErrorMessage("boo");
}
代码示例来源:origin: spring-projects/spring-framework
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable<?>) source,
Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
);
}
}
内容来源于网络,如有侵权,请联系作者删除!