io.reactivex.Observable.subscribe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(376)

本文整理了Java中io.reactivex.Observable.subscribe()方法的一些代码示例,展示了Observable.subscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribe()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribe

Observable.subscribe介绍

[英]Subscribes to an ObservableSource and ignores onNext and onComplete emissions.

If the Observable emits an error, it is wrapped into an io.reactivex.exceptions.OnErrorNotImplementedExceptionand routed to the RxJavaPlugins.onError handler. Scheduler: subscribe does not operate by default on a particular Scheduler.
[中]订阅一个可观察的资源,忽略下一个和未完成的排放。
如果可观测对象发出错误,它将被包装到io中。reactivex。例外。OnErrorNotImplementedException并路由到RxJavaPlugins。一个错误处理程序。调度程序:订阅默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Observer<? super R> observer) {
  if (!ScalarXMapZHelper.tryAsMaybe(source, mapper, observer)) {
    source.subscribe(new SwitchMapMaybeMainObserver<T, R>(observer, mapper, delayErrors));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    Observable<Integer> o = Observable.just(t)
        .subscribeOn(sch)
    ;
    Subject<Integer> subject = UnicastSubject.create();
    o.subscribe(subject);
    return subject;
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Override
protected void subscribeActual(Observer<? super R> observer) {
  if (!ScalarXMapZHelper.tryAsSingle(source, mapper, observer)) {
    source.subscribe(new SwitchMapSingleMainObserver<T, R>(observer, mapper, delayErrors));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
  o.subscribe();
  o.subscribe();
  o.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
  Observable<String> src = Observable.just("a", "b", null, "c");
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
  inOrder.verify(w, never()).onNext(anyString());
  inOrder.verify(w, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testDeferFunctionThrows() throws Exception {
    Callable<Observable<String>> factory = mock(Callable.class);

    when(factory.call()).thenThrow(new TestException());

    Observable<String> result = Observable.defer(factory);

    Observer<String> o = TestHelper.mockObserver();

    result.subscribe(o);

    verify(o).onError(any(TestException.class));
    verify(o, never()).onNext(any(String.class));
    verify(o, never()).onComplete();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCast() {
  Observable<?> source = Observable.just(1, 2);
  Observable<Integer> observable = source.cast(Integer.class);
  Observer<Integer> observer = TestHelper.mockObserver();
  observable.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(1);
  verify(observer, never()).onError(
      any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCombineLatest3TypesB() {
  Function3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction();
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> w = Observable.combineLatest(Observable.just("one"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction);
  w.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  verify(observer, times(1)).onNext("one2[4, 5, 6]");
  verify(observer, times(1)).onNext("one2[7, 8]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIsEmptyWithTwoItemsObservable() {
  Observable<Integer> w = Observable.just(1, 2);
  Observable<Boolean> observable = w.isEmpty().toObservable();
  Observer<Boolean> observer = TestHelper.mockObserver();
  observable.subscribe(observer);
  verify(observer, never()).onNext(true);
  verify(observer, times(1)).onNext(false);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIsEmptyWithOneItemObservable() {
  Observable<Integer> w = Observable.just(1);
  Observable<Boolean> observable = w.isEmpty().toObservable();
  Observer<Boolean> observer = TestHelper.mockObserver();
  observable.subscribe(observer);
  verify(observer, never()).onNext(true);
  verify(observer, times(1)).onNext(false);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDematerialize1() {
  Observable<Notification<Integer>> notifications = Observable.just(1, 2).materialize();
  Observable<Integer> dematerialize = notifications.dematerialize();
  Observer<Integer> observer = TestHelper.mockObserver();
  dematerialize.subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipAndCountGaplessBuffers() {
  Observable<String> source = Observable.just("one", "two", "three", "four", "five");
  Observable<List<String>> buffered = source.buffer(3, 3);
  buffered.subscribe(observer);
  InOrder inOrder = Mockito.inOrder(observer);
  inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three"));
  inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five"));
  inOrder.verify(observer, Mockito.never()).onNext(Mockito.<String>anyList());
  inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class));
  inOrder.verify(observer, Mockito.times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testLongTimeAction() throws InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  LongTimeAction action = new LongTimeAction(latch);
  Observable.just(1).buffer(10, TimeUnit.MILLISECONDS, 10)
      .subscribe(action);
  latch.await();
  assertFalse(action.fail);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFirstWithOneElementObservable() {
  Observable<Integer> o = Observable.just(1).firstElement().toObservable();
  Observer<Integer> observer = TestHelper.mockObserver();
  o.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onNext(1);
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithSizeSkipTake1() {
  Observable<Integer> source = Observable.just(1).repeat();
  Observable<List<Integer>> result = source.buffer(2, 3).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onNext(Arrays.asList(1, 1));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testConcat() {
  Observer<String> observer = TestHelper.mockObserver();
  final String[] o = { "1", "3", "5", "7" };
  final String[] e = { "2", "4", "6" };
  final Observable<String> odds = Observable.fromArray(o);
  final Observable<String> even = Observable.fromArray(e);
  Observable<String> concat = Observable.concat(odds, even);
  concat.subscribe(observer);
  verify(observer, times(7)).onNext(anyString());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void asyncFusedRejected() {
  TestObserver<Integer> to0 = ObserverFusion.newTest(QueueFuseable.ASYNC);
  Observable.range(1, 5)
  .doAfterNext(afterNext)
  .subscribe(to0);
  ObserverFusion.assertFusion(to0, QueueFuseable.NONE)
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSimple2() {
  Observable.range(1, 100).concatMapEager(toRange).subscribe(to);
  to.assertNoErrors();
  to.assertValueCount(200);
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void otherError() {
  final TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 5)
  .concatWith(Completable.error(new TestException()))
  .subscribe(to);
  to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testTake() {
    TestObserver<Integer> to = new TestObserver<Integer>();

    ObservableCache<Integer> cached = new ObservableCache<Integer>(Observable.range(1, 1000), 16);
    cached.take(10).subscribe(to);

    to.assertNoErrors();
    to.assertComplete();
    to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//        ts.assertUnsubscribed(); // FIXME no longer valid
    assertFalse(cached.hasObservers());
  }

相关文章

Observable类方法