io.reactivex.Observable.flatMapCompletable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(156)

本文整理了Java中io.reactivex.Observable.flatMapCompletable()方法的一些代码示例,展示了Observable.flatMapCompletable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapCompletable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:flatMapCompletable

Observable.flatMapCompletable介绍

[英]Maps each element of the upstream Observable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete.

Scheduler: flatMapCompletable does not operate by default on a particular Scheduler.
[中]将上游可观测的每个元素映射到CompletableSources中,订阅它们并等待上游和所有CompletableSources完成。
调度器:flatMapCompletable默认情况下不会在特定调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Integer> o) throws Exception {
    return o.flatMapCompletable(new Function<Integer, CompletableSource>() {
      @Override
      public CompletableSource apply(Integer v) throws Exception {
        return Completable.complete();
      }
    });
  }
}, false, 1, null);

代码示例来源:origin: ReactiveX/RxJava

/**
 * Maps each element of the upstream Observable into CompletableSources, subscribes to them and
 * waits until the upstream and all CompletableSources complete.
 * <p>
 * <img width="640" height="424" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMapCompletable.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param mapper the function that received each source value and transforms them into CompletableSources.
 * @return the new Completable instance
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
  return flatMapCompletable(mapper, false);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Integer> o) throws Exception {
    return o.flatMapCompletable(new Function<Integer, CompletableSource>() {
      @Override
      public CompletableSource apply(Integer v) throws Exception {
        return Completable.complete();
      }
    }).toObservable();
  }
}, false, 1, null);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposed() {
  TestHelper.checkDisposed(Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerObserver() {
  Observable.range(1, 3)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return new Completable() {
        @Override
        protected void subscribeActual(CompletableObserver observer) {
          observer.onSubscribe(Disposables.empty());
          assertFalse(((Disposable)observer).isDisposed());
          ((Disposable)observer).dispose();
          assertTrue(((Disposable)observer).isDisposed());
        }
      };
    }
  })
  .test();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayError() {
  Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, true)
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  })
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedObservable() {
  TestHelper.checkDisposed(Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }).toObservable());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayErrorObservable() {
  Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, true).toObservable()
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalObservable() {
  Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }).toObservable()
  .test()
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalAsync() {
  Observable.range(1, 1000)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayInnerErrorAll() {
  TestObserver<Void> to = Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true)
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  for (int i = 0; i < 10; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalAsyncObservable() {
  Observable.range(1, 1000)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
    }
  }).toObservable()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayInnerErrorAllObservable() {
  TestObserver<Integer> to = Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true).<Integer>toObservable()
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  for (int i = 0; i < 10; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalNonDelayErrorOuter() {
  Observable.range(1, 10).concatWith(Observable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, false)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fused() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  })
  .<Integer>toObservable()
  .subscribe(to);
  to
  .assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedObservable() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Observable.range(1, 10)
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }).<Integer>toObservable()
  .subscribe(to);
  to
  .assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalNonDelayErrorOuterObservable() {
  Observable.range(1, 10).concatWith(Observable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.complete();
    }
  }, false).toObservable()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayErrorAll() {
  TestObserver<Void> to = Observable.range(1, 10).concatWith(Observable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true)
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  for (int i = 0; i < 11; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalDelayErrorAllObservable() {
  TestObserver<Integer> to = Observable.range(1, 10).concatWith(Observable.<Integer>error(new TestException()))
  .flatMapCompletable(new Function<Integer, CompletableSource>() {
    @Override
    public CompletableSource apply(Integer v) throws Exception {
      return Completable.error(new TestException());
    }
  }, true).<Integer>toObservable()
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
  for (int i = 0; i < 11; i++) {
    TestHelper.assertError(errors, i, TestException.class);
  }
}

相关文章

Observable类方法