io.reactivex.Observable.concatMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(119)

本文整理了Java中io.reactivex.Observable.concatMap()方法的一些代码示例,展示了Observable.concatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMap()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMap

Observable.concatMap介绍

[英]Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items that result from concatenating those resulting ObservableSources.

Scheduler: concatMap does not operate by default on a particular Scheduler.
[中]返回一个新的Observable,该Observable将向源ObservableSource发出的每个项应用一个函数,该函数将返回一个ObservableSource,然后发送连接这些ObservableSource后产生的项。
调度器:默认情况下,concatMap不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a new Observable that emits items resulting from applying a function that you supply to each item
 * emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items
 * that result from concatenating those resulting ObservableSources.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatMap} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <R> the type of the inner ObservableSource sources and thus the output type
 * @param mapper
 *            a function that, when applied to an item emitted by the source ObservableSource, returns an
 *            ObservableSource
 * @return an Observable that emits the result of applying the transformation function to each item emitted
 *         by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
  return concatMap(mapper, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void concatMapNull() {
  just1.concatMap(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void concatMapReturnsNull() {
  just1.concatMap(new Function<Integer, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapJustSource() {
  Observable.just(0)
  .concatMap(new Function<Object, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Object v) throws Exception {
      return Observable.just(1);
    }
  }, 16)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.<Integer>just(1).hide()
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) throws Exception {
      return Observable.error(new TestException());
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedPollThrows() {
  Observable.just(1)
  .map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) throws Exception {
      return Observable.range(v, 2);
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerError() {
  Observable.<Integer>just(1).hide()
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) throws Exception {
      return Observable.error(new TestException());
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mapperThrows() {
  Observable.just(1).hide()
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that concatenate each item emitted by the source ObservableSource with the values in an
 * Iterable corresponding to that item that is generated by a selector.
 * <p>
 * <img width="640" height="275" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapIterable.o.png" alt="">
 *
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <U>
 *            the type of item emitted by the resulting ObservableSource
 * @param mapper
 *            a function that returns an Iterable sequence of values for when given an item emitted by the
 *            source ObservableSource
 * @param prefetch
 *            the number of elements to prefetch from the current Observable
 * @return an Observable that emits the results of concatenating the items emitted by the source ObservableSource with
 *         the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch) {
  ObjectHelper.requireNonNull(mapper, "mapper is null");
  ObjectHelper.verifyPositive(prefetch, "prefetch");
  return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), prefetch);
}

代码示例来源:origin: ReactiveX/RxJava

.concatMap(new Function<byte[], Observable<byte[]>>() {
  @Override
  public Observable<byte[]> apply(byte[] v) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

.concatMap(func).subscribe(new DefaultObserver<Integer>() {
  @Override
  public void onNext(Integer t) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusionWithConcatMap() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.fromIterable(Arrays.asList(1, 2, 3, 4)).concatMap(
  new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) {
      return Observable.range(v, 2);
    }
  }).subscribe(to);
  to.assertValues(1, 2, 2, 3, 3, 4, 4, 5);
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void concatMapErrorEmptySource() {
  assertSame(Observable.empty(), Observable.<Object>empty()
      .concatMap(new Function<Object, ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> apply(Object v) throws Exception {
          return Observable.just(1);
        }
      }, 16));
}

代码示例来源:origin: ReactiveX/RxJava

/** Issue #2844: wrong target of request. */
@Test(timeout = 3000)
public void testRepeatRetarget() {
  final List<Integer> concatBase = new ArrayList<Integer>();
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.just(1, 2)
  .repeat(5)
  .concatMap(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer x) {
      System.out.println("testRepeatRetarget -> " + x);
      concatBase.add(x);
      return Observable.<Integer>empty()
          .delay(200, TimeUnit.MILLISECONDS);
    }
  })
  .subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  to.assertNoValues();
  assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void noCancelPrevious() {
    final AtomicInteger counter = new AtomicInteger();

    Observable.range(1, 5)
    .concatMap(new Function<Integer, ObservableSource<Integer>>() {
      @Override
      public ObservableSource<Integer> apply(Integer v) throws Exception {
        return Observable.just(v).doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            counter.getAndIncrement();
          }
        });
      }
    })
    .test()
    .assertResult(1, 2, 3, 4, 5);

    assertEquals(0, counter.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void concatReportsDisposedOnComplete() {
  final Disposable[] disposable = { null };
  Observable.fromArray(Observable.just(1), Observable.just(2))
  .hide()
  .concatMap(Functions.<Observable<Integer>>identity())
  .subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
      disposable[0] = d;
    }
    @Override
    public void onNext(Integer t) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
  });
  assertTrue(disposable[0].isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainError() {
  Observable.<Integer>error(new TestException())
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v) throws Exception {
      return Observable.range(v, 2);
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test//(timeout = 100000)
public void concatMapRangeAsyncLoopIssue2876() {
  final long durationSeconds = 2;
  final long startTime = System.currentTimeMillis();
  for (int i = 0;; i++) {
    //only run this for a max of ten seconds
    if (System.currentTimeMillis() - startTime > TimeUnit.SECONDS.toMillis(durationSeconds)) {
      return;
    }
    if (i % 1000 == 0) {
      System.out.println("concatMapRangeAsyncLoop > " + i);
    }
    TestObserver<Integer> to = new TestObserver<Integer>();
    Observable.range(0, 1000)
    .concatMap(new Function<Integer, Observable<Integer>>() {
      @Override
      public Observable<Integer> apply(Integer t) {
        return Observable.fromIterable(Arrays.asList(t));
      }
    })
    .observeOn(Schedulers.computation()).subscribe(to);
    to.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS);
    to.assertTerminated();
    to.assertNoErrors();
    assertEquals(1000, to.valueCount());
    assertEquals((Integer)999, to.values().get(999));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void outputFusedOneSignal() {
  final BehaviorSubject<Integer> bs = BehaviorSubject.createDefault(1);
  bs.observeOn(ImmediateThinScheduler.INSTANCE)
  .concatMap(new Function<Integer, ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> apply(Integer v)
        throws Exception {
      return Observable.just(v + 1);
    }
  })
  .subscribeWith(new TestObserver<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 2) {
        bs.onNext(2);
      }
    }
  })
  .assertValuesOnly(2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void concatReportsDisposedOnError() {
  final Disposable[] disposable = { null };
  Observable.fromArray(Observable.just(1), Observable.<Integer>error(new TestException()))
  .hide()
  .concatMap(Functions.<Observable<Integer>>identity())
  .subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
      disposable[0] = d;
    }
    @Override
    public void onNext(Integer t) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
  });
  assertTrue(disposable[0].isDisposed());
}

相关文章

Observable类方法