io.reactivex.Observable.concat()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(186)

本文整理了Java中io.reactivex.Observable.concat()方法的一些代码示例,展示了Observable.concat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concat()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concat

Observable.concat介绍

[英]Returns an Observable that emits the items emitted by each of the ObservableSources emitted by the source ObservableSource, one after the other, without interleaving them.

Scheduler: concat does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源ObservableSource发出的每个ObservableSource发出的项,一个接一个地发出,而不交错它们。
调度器:默认情况下,concat不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void concatObservableNull() {
  Observable.concat((Observable<Observable<Object>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void concatIterableNull() {
  Observable.concat((Iterable<Observable<Object>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void concatIterableOneIsNull() {
  Observable.concat(Arrays.asList(just1, null)).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concat4() {
  Observable.concat(Observable.just(1), Observable.just(2),
      Observable.just(3), Observable.just(4))
  .test()
  .assertResult(1, 2, 3, 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concat3() {
  Observable.concat(Observable.just(1), Observable.just(2), Observable.just(3))
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void concatIterableIteratorNull() {
  Observable.concat(new Iterable<Observable<Object>>() {
    @Override
    public Iterator<Observable<Object>> iterator() {
      return null;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipError() {
  Exception e = new Exception();
  Observable<String> ok = Observable.just("one");
  Observable<String> error = Observable.error(e);
  Observable<String> skip = Observable.concat(ok, error).skip(100);
  Observer<String> observer = TestHelper.mockObserver();
  skip.subscribe(observer);
  verify(observer, never()).onNext(any(String.class));
  verify(observer, times(1)).onError(e);
  verify(observer, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
  TestScheduler testScheduler = new TestScheduler();
  Observable<Integer> source = Observable.concat(Observable.<Integer> error(new TestException()), Observable.just(1));
  Observer<Integer> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.observeOn(testScheduler).subscribe(o);
  inOrder.verify(o, never()).onError(any(TestException.class));
  testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  inOrder.verify(o).onError(any(TestException.class));
  inOrder.verify(o, never()).onNext(anyInt());
  inOrder.verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelySyncInNormalOut() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  Observable.concat(
      Observable.just(1).observeOn(s),
      Observable.just(2)
  )
  .test()
  .assertResult(1, 2);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testConcat() {
  Observer<String> observer = TestHelper.mockObserver();
  final String[] o = { "1", "3", "5", "7" };
  final String[] e = { "2", "4", "6" };
  final Observable<String> odds = Observable.fromArray(o);
  final Observable<String> even = Observable.fromArray(e);
  Observable<String> concat = Observable.concat(odds, even);
  concat.subscribe(observer);
  verify(observer, times(7)).onNext(anyString());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAggregateAsIntSumSourceThrows() {
  Single<Integer> result = Observable.concat(Observable.just(1, 2, 3, 4, 5),
      Observable.<Integer> error(new TestException()))
      .reduce(0, sum).map(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer v) {
          return v;
        }
      });
  result.subscribe(singleObserver);
  verify(singleObserver, never()).onSuccess(any());
  verify(singleObserver, times(1)).onError(any(TestException.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testConcatSimple() {
  Observable<String> o1 = Observable.just("one", "two");
  Observable<String> o2 = Observable.just("three", "four");
  List<String> values = Observable.concat(o1, o2).toList().blockingGet();
  assertEquals("one", values.get(0));
  assertEquals("two", values.get(1));
  assertEquals("three", values.get(2));
  assertEquals("four", values.get(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithError3() {
  Single<Boolean> o = Observable.sequenceEqual(
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException())),
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException())));
  verifyError(o);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelyNormalInNormalOut() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  Observable.concat(
      Observable.just(1).hide().observeOn(s),
      Observable.just(2)
  )
  .test()
  .assertResult(1, 2);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithError2() {
  Single<Boolean> o = Observable.sequenceEqual(
      Observable.just("one", "two", "three"),
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException())));
  verifyError(o);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithError3Observable() {
  Observable<Boolean> o = Observable.sequenceEqual(
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException())),
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException()))).toObservable();
  verifyError(o);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithError1() {
  Single<Boolean> o = Observable.sequenceEqual(
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException())),
      Observable.just("one", "two", "three"));
  verifyError(o);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithError1Observable() {
  Observable<Boolean> o = Observable.sequenceEqual(
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException())),
      Observable.just("one", "two", "three")).toObservable();
  verifyError(o);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWithError2Observable() {
  Observable<Boolean> o = Observable.sequenceEqual(
      Observable.just("one", "two", "three"),
      Observable.concat(Observable.just("one"),
          Observable.<String> error(new TestException()))).toObservable();
  verifyError(o);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testError2() {
  Observable<Integer> source = Observable.concat(Observable.just(0),
      Observable.<Integer> error(new TestException("Forced failure")));
  Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Object> to = new TestObserver<Object>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertValueCount(1);
}

相关文章

Observable类方法