io.reactivex.Observable.concatArrayEagerDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(3.6k)|赞(0)|评价(0)|浏览(88)

本文整理了Java中io.reactivex.Observable.concatArrayEagerDelayError()方法的一些代码示例,展示了Observable.concatArrayEagerDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatArrayEagerDelayError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatArrayEagerDelayError

Observable.concatArrayEagerDelayError介绍

[英]Concatenates an array of ObservableSources eagerly into a single stream of values and delaying any errors until all sources terminate.

Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes. Scheduler: This method does not operate by default on a particular Scheduler.
[中]将一组可观测资源急切地连接成一个值流,并延迟任何错误,直到所有源终止。
即时连接意味着一旦订户订阅,该操作符订阅所有源可观测资源。运算符缓冲这些可观察资源发出的值,然后依次将其耗尽,每一个都在前一个完成之后。调度器:默认情况下,该方法不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values
 * and delaying any errors until all sources terminate.
 * <p>
 * <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEagerDelayError.png" alt="">
 * <p>
 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
 * source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s
 * and then drains them in order, each one after the previous one completes.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated
 * @return the new Observable instance with the specified concatenation behavior
 * @since 2.2.1 - experimental
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEagerDelayError(ObservableSource<? extends T>... sources) {
  return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void arrayDelayErrorDefault() {
  PublishSubject<Integer> ps1 = PublishSubject.create();
  PublishSubject<Integer> ps2 = PublishSubject.create();
  PublishSubject<Integer> ps3 = PublishSubject.create();
  @SuppressWarnings("unchecked")
  TestObserver<Integer> to = Observable.concatArrayEagerDelayError(ps1, ps2, ps3)
  .test();
  to.assertEmpty();
  assertTrue(ps1.hasObservers());
  assertTrue(ps2.hasObservers());
  assertTrue(ps3.hasObservers());
  ps2.onNext(2);
  ps2.onComplete();
  to.assertEmpty();
  ps1.onNext(1);
  to.assertValuesOnly(1);
  ps1.onComplete();
  to.assertValuesOnly(1, 2);
  ps3.onComplete();
  to.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void arrayDelayErrorMaxConcurrency() {
  PublishSubject<Integer> ps1 = PublishSubject.create();
  PublishSubject<Integer> ps2 = PublishSubject.create();
  PublishSubject<Integer> ps3 = PublishSubject.create();
  @SuppressWarnings("unchecked")
  TestObserver<Integer> to = Observable.concatArrayEagerDelayError(2, 2, ps1, ps2, ps3)
  .test();
  to.assertEmpty();
  assertTrue(ps1.hasObservers());
  assertTrue(ps2.hasObservers());
  assertFalse(ps3.hasObservers());
  ps2.onNext(2);
  ps2.onComplete();
  to.assertEmpty();
  ps1.onNext(1);
  to.assertValuesOnly(1);
  ps1.onComplete();
  assertTrue(ps3.hasObservers());
  to.assertValuesOnly(1, 2);
  ps3.onComplete();
  to.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

TestObserver<Integer> to = Observable.concatArrayEagerDelayError(2, 2, ps1, ps2, ps3)
.test();

相关文章

Observable类方法