io.reactivex.Observable.concatMapSingle()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(127)

本文整理了Java中io.reactivex.Observable.concatMapSingle()方法的一些代码示例,展示了Observable.concatMapSingle()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMapSingle()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMapSingle

Observable.concatMapSingle介绍

[英]Maps the upstream items into SingleSources and subscribes to them one after the other succeeds, emits their success values or terminates immediately if either this Observable or the current inner SingleSource fail.

Scheduler: concatMapSingle does not operate by default on a particular Scheduler.
[中]将上游项映射到单一源中,并一个接一个地订阅它们,发出它们的成功值,或者在该可观察项或当前内部单一源失败时立即终止。
调度程序:默认情况下,concatMapSingle不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the
 * other succeeds, emits their success values or terminates immediately if
 * either this {@code Observable} or the current inner {@code SingleSource} fail.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * <p>History: 2.1.11 - experimental
 * @param <R> the result type of the inner {@code SingleSource}s
 * @param mapper the function called with the upstream item and should return
 *               a {@code SingleSource} to become the next source to
 *               be subscribed to
 * @return a new Observable instance
 * @see #concatMapSingleDelayError(Function)
 * @see #concatMapSingle(Function, int)
 * @since 2.2
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
  return concatMapSingle(mapper, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapperCrashScalar() {
  TestObserver<Object> to = Observable.just(1)
  .concatMapSingle(new Function<Integer, SingleSource<? extends Object>>() {
    @Override
    public SingleSource<? extends Object> apply(Integer v)
        throws Exception {
          throw new TestException();
        }
  })
  .test();
  to.assertFailure(TestException.class);
}

代码示例来源:origin: redisson/redisson

/**
 * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the
 * other succeeds, emits their success values or terminates immediately if
 * either this {@code Observable} or the current inner {@code SingleSource} fail.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <R> the result type of the inner {@code SingleSource}s
 * @param mapper the function called with the upstream item and should return
 *               a {@code SingleSource} to become the next source to
 *               be subscribed to
 * @return a new Observable instance
 * @since 2.1.11 - experimental
 * @see #concatMapSingleDelayError(Function)
 * @see #concatMapSingle(Function, int)
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Observable<R> concatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
  return concatMapSingle(mapper, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposed() {
  TestHelper.checkDisposed(Observable.just(1).hide()
      .concatMapSingle(Functions.justFunction(Single.never()))
  );
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerError() {
  Observable.just(1)
  .concatMapSingle(Functions.justFunction(Single.error(new TestException())))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void simple() {
  Observable.range(1, 5)
  .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v)
        throws Exception {
      return Single.just(v);
    }
  })
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Observable.range(1, 5)
  .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v)
        throws Exception {
      return Single.just(v);
    }
  })
  .take(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void simpleLong() {
  Observable.range(1, 1024)
  .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v)
        throws Exception {
      return Single.just(v);
    }
  }, 32)
  .test()
  .assertValueCount(1024)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainError() {
  Observable.error(new TestException())
  .concatMapSingle(Functions.justFunction(Single.just(1)))
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerSuccessDisposeRace() {
  for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
    final SingleSubject<Integer> ss = SingleSubject.create();
    final TestObserver<Integer> to = Observable.just(1)
        .hide()
        .concatMapSingle(Functions.justFunction(ss))
        .test();
    Runnable r1 = new Runnable() {
      @Override
      public void run() {
        ss.onSuccess(1);
      }
    };
    Runnable r2 = new Runnable() {
      @Override
      public void run() {
        to.dispose();
      }
    };
    TestHelper.race(r1, r2);
    to.assertNoErrors();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scalarEmptySource() {
  SingleSubject<Integer> ss = SingleSubject.create();
  Observable.empty()
  .concatMapSingle(Functions.justFunction(ss))
  .test()
  .assertResult();
  assertFalse(ss.hasObservers());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void cancel() {
  Observable.range(1, 5).concatWith(Observable.<Integer>never())
  .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v)
        throws Exception {
      return Single.just(v);
    }
  })
  .test()
  .assertValues(1, 2, 3, 4, 5)
  .assertNoErrors()
  .assertNotComplete()
  .cancel();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void checkUnboundedInnerQueue() {
  SingleSubject<Integer> ss = SingleSubject.create();
  @SuppressWarnings("unchecked")
  TestObserver<Integer> to = Observable
      .fromArray(ss, Single.just(2), Single.just(3), Single.just(4))
      .concatMapSingle(Functions.<Single<Integer>>identity(), 2)
      .test();
  to.assertEmpty();
  ss.onSuccess(1);
  to.assertResult(1, 2, 3, 4);
}

代码示例来源:origin: forkachild/reel-search-android

@Override
protected void onStart() {
  super.onStart();
  mDisposable.add(mDictionaryManager.loadDictionary()
      .doOnSubscribe(d -> {
        mBinding.txtQuery.setEnabled(false);
        mBinding.txtQuery.setHint("Loading dictionary");
      })
      .doOnComplete(() -> {
        mBinding.txtQuery.setEnabled(true);
        mBinding.txtQuery.setHint("Start typing");
      })
      .subscribe(() -> {
      }, Throwable::printStackTrace));
  mDisposable.add(RxUtils.onTextChange(mBinding.txtQuery)
      .filter(in -> mDictionaryManager.isLoaded())
      .concatMapSingle(in -> mDictionaryManager.query(in))
      .doOnNext(in -> mBinding.btnSelect.setEnabled(!in.isEmpty()))
      .subscribe(mAdapter::setData, Throwable::printStackTrace));
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test() {
   TestScheduler testScheduler = new TestScheduler();

   final Single<List<Integer>> first = Single.timer(2, TimeUnit.SECONDS, testScheduler)
       .map(u -> Arrays.asList(1, 2, 3));
   final Single<List<Integer>> second = Single.just(Collections.emptyList());
   final Single<List<Integer>> third = Single.just(Collections.singletonList(4));
   final Single<List<Integer>> fourth = Single.just(Collections.singletonList(5));

   Single<List<Integer>> subject = Observable
    .fromIterable(Arrays.asList(first, second, third, fourth))
    .concatMapSingle(single -> single)
    .reduce(new ArrayList<>(), (seed, items) -> {
     seed.addAll(items);
     return seed;
    });

    TestObserver<List<Integer>> testObserver = subject.test();
    testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

    System.out.println(testObserver.values());
    testObserver.assertValue(list -> list.equals(Arrays.asList(1, 2, 3, 4, 5))); 
    // 5 is currently missing ; fourth was never subscribed in the first place
  }
}

相关文章

Observable类方法