本文整理了Java中io.reactivex.Observable.subscribe()
方法的一些代码示例,展示了Observable.subscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribe()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribe
[英]Subscribes to an ObservableSource and ignores onNext and onComplete emissions.
If the Observable emits an error, it is wrapped into an io.reactivex.exceptions.OnErrorNotImplementedExceptionand routed to the RxJavaPlugins.onError handler. Scheduler: subscribe does not operate by default on a particular Scheduler.
[中]订阅一个可观察的资源,忽略下一个和未完成的排放。
如果可观测对象发出错误,它将被包装到io中。reactivex。例外。OnErrorNotImplementedException并路由到RxJavaPlugins。一个错误处理程序。调度程序:订阅默认情况下不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
protected void subscribeActual(Observer<? super R> observer) {
if (!ScalarXMapZHelper.tryAsMaybe(source, mapper, observer)) {
source.subscribe(new SwitchMapMaybeMainObserver<T, R>(observer, mapper, delayErrors));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
Observable<Integer> o = Observable.just(t)
.subscribeOn(sch)
;
Subject<Integer> subject = UnicastSubject.create();
o.subscribe(subject);
return subject;
}
};
代码示例来源:origin: ReactiveX/RxJava
@Override
protected void subscribeActual(Observer<? super R> observer) {
if (!ScalarXMapZHelper.tryAsSingle(source, mapper, observer)) {
source.subscribe(new SwitchMapSingleMainObserver<T, R>(observer, mapper, delayErrors));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
o.subscribe();
o.subscribe();
o.subscribe();
verify(unsubscribe, never()).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
Observable<String> src = Observable.just("a", "b", null, "c");
src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
inOrder.verify(w, never()).onNext(anyString());
inOrder.verify(w, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDeferFunctionThrows() throws Exception {
Callable<Observable<String>> factory = mock(Callable.class);
when(factory.call()).thenThrow(new TestException());
Observable<String> result = Observable.defer(factory);
Observer<String> o = TestHelper.mockObserver();
result.subscribe(o);
verify(o).onError(any(TestException.class));
verify(o, never()).onNext(any(String.class));
verify(o, never()).onComplete();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCast() {
Observable<?> source = Observable.just(1, 2);
Observable<Integer> observable = source.cast(Integer.class);
Observer<Integer> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(1);
verify(observer, never()).onError(
any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCombineLatest3TypesB() {
Function3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction();
/* define an Observer to receive aggregated events */
Observer<String> observer = TestHelper.mockObserver();
Observable<String> w = Observable.combineLatest(Observable.just("one"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction);
w.subscribe(observer);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
verify(observer, times(1)).onNext("one2[4, 5, 6]");
verify(observer, times(1)).onNext("one2[7, 8]");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIsEmptyWithTwoItemsObservable() {
Observable<Integer> w = Observable.just(1, 2);
Observable<Boolean> observable = w.isEmpty().toObservable();
Observer<Boolean> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, never()).onNext(true);
verify(observer, times(1)).onNext(false);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIsEmptyWithOneItemObservable() {
Observable<Integer> w = Observable.just(1);
Observable<Boolean> observable = w.isEmpty().toObservable();
Observer<Boolean> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, never()).onNext(true);
verify(observer, times(1)).onNext(false);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDematerialize1() {
Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
Observable<Integer> dematerialize = notifications.dematerialize();
Observer<Integer> observer = TestHelper.mockObserver();
dematerialize.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSkipAndCountGaplessBuffers() {
Observable<String> source = Observable.just("one", "two", "three", "four", "five");
Observable<List<String>> buffered = source.buffer(3, 3);
buffered.subscribe(observer);
InOrder inOrder = Mockito.inOrder(observer);
inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three"));
inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five"));
inOrder.verify(observer, Mockito.never()).onNext(Mockito.<String>anyList());
inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class));
inOrder.verify(observer, Mockito.times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testLongTimeAction() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
LongTimeAction action = new LongTimeAction(latch);
Observable.just(1).buffer(10, TimeUnit.MILLISECONDS, 10)
.subscribe(action);
latch.await();
assertFalse(action.fail);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFirstWithOneElementObservable() {
Observable<Integer> o = Observable.just(1).firstElement().toObservable();
Observer<Integer> observer = TestHelper.mockObserver();
o.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithSizeSkipTake1() {
Observable<Integer> source = Observable.just(1).repeat();
Observable<List<Integer>> result = source.buffer(2, 3).take(1);
Observer<Object> o = TestHelper.mockObserver();
result.subscribe(o);
verify(o).onNext(Arrays.asList(1, 1));
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testConcat() {
Observer<String> observer = TestHelper.mockObserver();
final String[] o = { "1", "3", "5", "7" };
final String[] e = { "2", "4", "6" };
final Observable<String> odds = Observable.fromArray(o);
final Observable<String> even = Observable.fromArray(e);
Observable<String> concat = Observable.concat(odds, even);
concat.subscribe(observer);
verify(observer, times(7)).onNext(anyString());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedRejected() {
TestObserver<Integer> to0 = ObserverFusion.newTest(QueueFuseable.ASYNC);
Observable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(to0);
ObserverFusion.assertFusion(to0, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSimple2() {
Observable.range(1, 100).concatMapEager(toRange).subscribe(to);
to.assertNoErrors();
to.assertValueCount(200);
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherError() {
final TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, 5)
.concatWith(Completable.error(new TestException()))
.subscribe(to);
to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake() {
TestObserver<Integer> to = new TestObserver<Integer>();
ObservableCache<Integer> cached = new ObservableCache<Integer>(Observable.range(1, 1000), 16);
cached.take(10).subscribe(to);
to.assertNoErrors();
to.assertComplete();
to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// ts.assertUnsubscribed(); // FIXME no longer valid
assertFalse(cached.hasObservers());
}
内容来源于网络,如有侵权,请联系作者删除!