
x33g5p2x  于2022-01-25 转载在 其他  



[英]Modifies the source ObservableSource so that it notifies an Observer for each item and terminal event it emits.

In case the onError of the supplied observer throws, the downstream will receive a composite exception containing the original exception and the exception thrown by onError. If either the onNext or the onComplete method of the supplied observer throws, the downstream will be terminated and will receive this thrown exception.

Scheduler: doOnEach does not operate by default on a particular Scheduler.


代码示例来源:origin: ReactiveX/RxJava

  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.doOnEach(new TestObserver<Object>());

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnEachSupplierNull() {

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnEachSubscriberNull() {

代码示例来源:origin: ReactiveX/RxJava

 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onNext}.
 * <p>
 * <img width="640" height="360" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onNext
 *            the action to invoke when the source ObservableSource calls {@code onNext}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
  return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);

代码示例来源:origin: ReactiveX/RxJava

 * Modifies the source ObservableSource so that it invokes an action if it calls {@code onError}.
 * <p>
 * In case the {@code onError} action throws, the downstream will receive a composite exception containing
 * the original exception and the exception thrown by {@code onError}.
 * <p>
 * <img width="640" height="355" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onError
 *            the action to invoke if the source ObservableSource calls {@code onError}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
  return doOnEach(Functions.emptyConsumer(), onError, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);

代码示例来源:origin: ReactiveX/RxJava

 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete}.
 * <p>
 * <img width="640" height="358" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onComplete
 *            the action to invoke when the source ObservableSource calls {@code onComplete}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnComplete(Action onComplete) {
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete, Functions.EMPTY_ACTION);

代码示例来源:origin: ReactiveX/RxJava

public void testDoOnEach() {
  Observable<String> base = Observable.just("a", "b", "c");
  Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
  // ensure the leaf Observer is still getting called
  verify(subscribedObserver, never()).onError(any(Throwable.class));
  verify(subscribedObserver, times(1)).onNext("a");
  verify(subscribedObserver, times(1)).onNext("b");
  verify(subscribedObserver, times(1)).onNext("c");
  verify(subscribedObserver, times(1)).onComplete();
  // ensure our injected Observer is getting called
  verify(sideEffectObserver, never()).onError(any(Throwable.class));
  verify(sideEffectObserver, times(1)).onNext("a");
  verify(sideEffectObserver, times(1)).onNext("b");
  verify(sideEffectObserver, times(1)).onNext("c");
  verify(sideEffectObserver, times(1)).onComplete();

代码示例来源:origin: redisson/redisson

 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onNext}.
 * <p>
 * <img width="640" height="360" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onNext
 *            the action to invoke when the source ObservableSource calls {@code onNext}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
  return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);

代码示例来源:origin: ReactiveX/RxJava

public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).doOnEach(new TestObserver<Integer>()));

代码示例来源:origin: ReactiveX/RxJava

public void testDoOnEachWithError() {
  Observable<String> base = Observable.just("one", "fail", "two", "three", "fail");
  Observable<String> errs = Function<String, String>() {
    public String apply(String s) {
      if ("fail".equals(s)) {
        throw new RuntimeException("Forced Failure");
      return s;
  Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
  verify(subscribedObserver, times(1)).onNext("one");
  verify(subscribedObserver, never()).onNext("two");
  verify(subscribedObserver, never()).onNext("three");
  verify(subscribedObserver, never()).onComplete();
  verify(subscribedObserver, times(1)).onError(any(Throwable.class));
  verify(sideEffectObserver, times(1)).onNext("one");
  verify(sideEffectObserver, never()).onNext("two");
  verify(sideEffectObserver, never()).onNext("three");
  verify(sideEffectObserver, never()).onComplete();
  verify(sideEffectObserver, times(1)).onError(any(Throwable.class));

代码示例来源:origin: ReactiveX/RxJava

 * Registers an {@link Action} to be called when this ObservableSource invokes either
 * {@link Observer#onComplete onComplete} or {@link Observer#onError onError}.
 * <p>
 * <img width="640" height="310" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onFinally
 *            an {@link Action} to be invoked when the source ObservableSource finishes
 * @return an Observable that emits the same items as the source ObservableSource, then invokes the
 *         {@link Action}
 * @see <a href="">ReactiveX operators documentation: Do</a>
 * @see #doOnTerminate(Action)
public final Observable<T> doAfterTerminate(Action onFinally) {
  ObjectHelper.requireNonNull(onFinally, "onFinally is null");
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, onFinally);

代码示例来源:origin: ReactiveX/RxJava

 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete} or
 * {@code onError}.
 * <p>
 * <img width="640" height="327" src="" alt="">
 * <p>
 * This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
 * {@code onError} notification.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onTerminate
 *            the action to invoke when the source ObservableSource calls {@code onComplete} or {@code onError}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
 * @see #doAfterTerminate(Action)
public final Observable<T> doOnTerminate(final Action onTerminate) {
  ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
  return doOnEach(Functions.emptyConsumer(),
      Functions.actionConsumer(onTerminate), onTerminate,

代码示例来源:origin: redisson/redisson

 * Modifies the source ObservableSource so that it invokes an action when it calls {@code onComplete}.
 * <p>
 * <img width="640" height="358" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onComplete
 *            the action to invoke when the source ObservableSource calls {@code onComplete}
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnComplete(Action onComplete) {
  return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), onComplete, Functions.EMPTY_ACTION);

代码示例来源:origin: ReactiveX/RxJava

 * Modifies the source ObservableSource so that it invokes an action for each item it emits.
 * <p>
 * <img width="640" height="310" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param onNotification
 *            the action to invoke for each item emitted by the source ObservableSource
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification) {
  ObjectHelper.requireNonNull(onNotification, "consumer is null");
  return doOnEach(

代码示例来源:origin: ReactiveX/RxJava

 * Modifies the source ObservableSource so that it notifies an Observer for each item and terminal event it emits.
 * <p>
 * In case the {@code onError} of the supplied observer throws, the downstream will receive a composite
 * exception containing the original exception and the exception thrown by {@code onError}. If either the
 * {@code onNext} or the {@code onComplete} method of the supplied observer throws, the downstream will be
 * terminated and will receive this thrown exception.
 * <p>
 * <img width="640" height="310" src="" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param observer
 *            the observer to be notified about onNext, onError and onComplete events on its
 *            respective methods before the actual downstream Observer gets notified.
 * @return the source ObservableSource with the side-effecting behavior applied
 * @see <a href="">ReactiveX operators documentation: Do</a>
public final Observable<T> doOnEach(final Observer<? super T> observer) {
  ObjectHelper.requireNonNull(observer, "observer is null");
  return doOnEach(

代码示例来源:origin: ReactiveX/RxJava

public void testWithCombineLatestIssue1717() throws InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  final AtomicInteger count = new AtomicInteger();
  final int SIZE = 2000;
  Observable<Long> timer = Observable.interval(0, 1, TimeUnit.MILLISECONDS)
      .doOnEach(new Consumer<Notification<Long>>() {
        public void accept(Notification<Long> n) {
            //                        System.out.println(n);
            if (count.incrementAndGet() >= SIZE) {
  TestObserver<Long> to = new TestObserver<Long>();
  Observable.combineLatest(timer, Observable.<Integer> never(), new BiFunction<Long, Integer, Long>() {
    public Long apply(Long t1, Integer t2) {
      return t1;
  if (!latch.await(SIZE + 1000, TimeUnit.MILLISECONDS)) {
    fail("timed out");
  assertEquals(SIZE, count.get());

代码示例来源:origin: ReactiveX/RxJava

public void testDelayEmitsEverything() {
  Observable<Integer> source = Observable.range(1, 5);
  Observable<Integer> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
  delayed = delayed.doOnEach(new Consumer<Notification<Integer>>() {
    public void accept(Notification<Integer> t1) {
  TestObserver<Integer> observer = new TestObserver<Integer>();
  // all will be delivered after 500ms since range does not delay between them
  scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS);
  observer.assertValues(1, 2, 3, 4, 5);

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

}).doOnEach(new Consumer<Notification<String>>() {

代码示例来源:origin: ReactiveX/RxJava

public Observable<String> apply(final GroupedObservable<Integer, Integer> group) {
  if (group.getKey() < 3) {
    return Function<Integer, String>() {
      public String apply(Integer t1) {
        return "first groups: " + t1;
        // must take(2) so an onComplete + unsubscribe happens on these first 2 groups
        .take(2).doOnComplete(new Action() {
          public void run() {
  } else {
    return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Function<Integer, String>() {
      public String apply(Integer t1) {
        return "last group: " + t1;
    }).doOnEach(new Consumer<Notification<String>>() {
      public void accept(Notification<String> t1) {
        System.err.println("subscribeOn notification => " + t1);

