
x33g5p2x  于2022-01-25 转载在 其他  



[英]Invokes a method on each item emitted by this Observable and blocks until the Observable completes.

Note: This will block even if the underlying Observable is asynchronous.

This is similar to Observable#subscribe(Observer), but it blocks. Because it blocks it does not need the Observer#onComplete() or Observer#onError(Throwable) methods. If the underlying Observable terminates with an error, rather than calling onError, this method will throw an exception.

The difference between this method and #subscribe(Consumer) is that the onNext action is executed on the emission thread instead of the current thread. Scheduler: blockingForEach does not operate by default on a particular Scheduler. Error handling: If the source signals an error, the operator wraps a checked Exceptioninto RuntimeException and throws that. Otherwise, RuntimeExceptions and Errors are rethrown as they are.


代码示例来源:origin: ReactiveX/RxJava

private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
  final ConcurrentHashMap<K, Collection<V>> result = new ConcurrentHashMap<K, Collection<V>>();
  observable.blockingForEach(new Consumer<GroupedObservable<K, V>>() {
    public void accept(final GroupedObservable<K, V> o) {
      result.put(o.getKey(), new ConcurrentLinkedQueue<V>());
      o.subscribe(new Consumer<V>() {
        public void accept(V v) {
  return result;

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void blockingForEachThrows() {
  .blockingForEach(new Consumer<Integer>() {
    public void accept(Integer e) throws Exception {
      throw new TestException();

代码示例来源:origin: ReactiveX/RxJava

private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {
  final List<List<T>> lists = new ArrayList<List<T>>();
  Observable.concat( Function<Observable<T>, Observable<List<T>>>() {
    public Observable<List<T>> apply(Observable<T> xs) {
      return xs.toList().toObservable();
      .blockingForEach(new Consumer<List<T>>() {
        public void accept(List<T> xs) {
  return lists;

代码示例来源:origin: ReactiveX/RxJava

  public Integer apply(Integer v) throws Exception {
    Observable.just(1).delay(10, TimeUnit.SECONDS).blockingForEach(Functions.emptyConsumer());
    return v;

代码示例来源:origin: ReactiveX/RxJava

 * This won't compile if super/extends isn't done correctly on generics.
public void testCovarianceOfZip() {
  Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  Observable<CoolRating> ratings = Observable.just(new CoolRating());
  Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
  Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);

代码示例来源:origin: ReactiveX/RxJava

 * This won't compile if super/extends isn't done correctly on generics.
public void testCovarianceOfCombineLatest() {
  Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  Observable<CoolRating> ratings = Observable.just(new CoolRating());
  Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
  Observable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);

代码示例来源:origin: ReactiveX/RxJava

 * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream.
public void testObserveOnWithNewThreadScheduler() {
  final AtomicInteger count = new AtomicInteger();
  final int _multiple = 99;
  Observable.range(1, 100000).map(new Function<Integer, Integer>() {
    public Integer apply(Integer t1) {
      return t1 * _multiple;
  .blockingForEach(new Consumer<Integer>() {
    public void accept(Integer t1) {
      assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
      // FIXME toBlocking methods run on the current thread
      String name = Thread.currentThread().getName();
      assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));

代码示例来源:origin: ReactiveX/RxJava

 * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
public void testObserveOnWithThreadPoolScheduler() {
  final AtomicInteger count = new AtomicInteger();
  final int _multiple = 99;
  Observable.range(1, 100000).map(new Function<Integer, Integer>() {
    public Integer apply(Integer t1) {
      return t1 * _multiple;
  .blockingForEach(new Consumer<Integer>() {
    public void accept(Integer t1) {
      assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
      // FIXME toBlocking methods run on the caller's thread
      String name = Thread.currentThread().getName();
      assertFalse("Wrong thread name: " + name, name.startsWith("Rx"));

代码示例来源:origin: ReactiveX/RxJava

.blockingForEach(new Consumer<Integer>() {

代码示例来源:origin: ReactiveX/RxJava

  public void testWindow() {
    final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();

      Observable.just(1, 2, 3, 4, 5, 6)
      .map(new Function<Observable<Integer>, Observable<List<Integer>>>() {
        public Observable<List<Integer>> apply(Observable<Integer> xs) {
          return xs.toList().toObservable();
    .blockingForEach(new Consumer<List<Integer>>() {
      public void accept(List<Integer> xs) {

    assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
    assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
    assertEquals(2, lists.size());


代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testMultiTake() {
  final AtomicInteger count = new AtomicInteger();
  Observable.unsafeCreate(new ObservableSource<Integer>() {
    public void subscribe(Observer<? super Integer> observer) {
      Disposable bs = Disposables.empty();
      for (int i = 0; !bs.isDisposed(); i++) {
        System.out.println("Emit: " + i);
  }).take(100).take(1).blockingForEach(new Consumer<Integer>() {
    public void accept(Integer t1) {
      System.out.println("Receive: " + t1);
  assertEquals(1, count.get());

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000)
public void toObservableNormal() {

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 5000, expected = TestException.class)
public void toObservableError() {

代码示例来源:origin: ReactiveX/RxJava

  public void testUnsubscribeScan() throws Exception {

    ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
    .scan(new HashMap<String, String>(), new BiFunction<HashMap<String, String>, Event, HashMap<String, String>>() {
      public HashMap<String, String> apply(HashMap<String, String> accum, Event perInstanceEvent) {
        accum.put("instance", perInstanceEvent.instanceId);
        return accum;
    .blockingForEach(new Consumer<HashMap<String, String>>() {
      public void accept(HashMap<String, String> pv) {

    Thread.sleep(200); // make sure the event streams receive their interrupt

代码示例来源:origin: ReactiveX/RxJava

Observable.merge(source).take(6).blockingForEach(new Consumer<Long>() {

代码示例来源:origin: ReactiveX/RxJava

}).blockingForEach(new Consumer<String>() {

代码示例来源:origin: ReactiveX/RxJava

}).blockingForEach(new Consumer<String>() {

代码示例来源:origin: ReactiveX/RxJava

.blockingForEach(new Consumer<Object>() {
  public void accept(Object pv) {

代码示例来源:origin: ReactiveX/RxJava

public void testTakeUnsubscribesOnGroupBy() throws Exception {
    ObservableEventStream.getEventStream("HTTP-ClusterA", 50),
    ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
  // group by type (2 clusters)
  .groupBy(new Function<Event, String>() {
    public String apply(Event event) {
      return event.type;
  .blockingForEach(new Consumer<GroupedObservable<String, Event>>() {
    public void accept(GroupedObservable<String, Event> v) {
      v.take(1).subscribe();  // FIXME groups need consumption to a certain degree to cancel upstream
  System.out.println("**** finished");
  Thread.sleep(200); // make sure the event streams receive their interrupt

代码示例来源:origin: ReactiveX/RxJava

.blockingForEach(new Consumer<Object>() {
  public void accept(Object pv) {

