io.reactivex.Observable.blockingIterable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(114)

本文整理了Java中io.reactivex.Observable.blockingIterable()方法的一些代码示例,展示了Observable.blockingIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.blockingIterable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:blockingIterable

Observable.blockingIterable介绍

[英]Converts this Observable into an Iterable.

Scheduler: blockingIterable does not operate by default on a particular Scheduler.
[中]将此可观察项转换为可观察项。
调度器:blockingIterable默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Integer apply(Integer v) throws Exception {
    Observable.just(1).delay(10, TimeUnit.SECONDS).blockingIterable().iterator().next();
    return v;
  }
})

代码示例来源:origin: ReactiveX/RxJava

Iterator<T> it = blockingIterable().iterator();
while (it.hasNext()) {
  try {

代码示例来源:origin: ReactiveX/RxJava

/**
 * Converts this {@code Observable} into an {@link Iterable}.
 * <p>
 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @return an {@link Iterable} version of this {@code Observable}
 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingIterable() {
  return blockingIterable(bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWhenMaxConcurrentIsOne() {
  for (int i = 0; i < 100; i++) {
    List<Observable<String>> os = new ArrayList<Observable<String>>();
    os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
    os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
    os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
    List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
    Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
    List<String> actual = new ArrayList<String>();
    while (iter.hasNext()) {
      actual.add(iter.next());
    }
    assertEquals(expected, actual);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRangeWithOverflow5() {
  assertFalse(Observable.rangeLong(Long.MIN_VALUE, 0).blockingIterable().iterator().hasNext());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRangeWithOverflow5() {
  assertFalse(Observable.range(Integer.MIN_VALUE, 0).blockingIterable().iterator().hasNext());
}

代码示例来源:origin: redisson/redisson

/**
 * Converts this {@code Observable} into an {@link Iterable}.
 * <p>
 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingIterable.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code blockingIterable} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @return an {@link Iterable} version of this {@code Observable}
 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingIterable() {
  return blockingIterable(bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testToIterator() {
  Observable<String> obs = Observable.just("one", "two", "three");
  Iterator<String> it = obs.blockingIterable().iterator();
  assertEquals(true, it.hasNext());
  assertEquals("one", it.next());
  assertEquals(true, it.hasNext());
  assertEquals("two", it.next());
  assertEquals(true, it.hasNext());
  assertEquals("three", it.next());
  assertEquals(false, it.hasNext());
}

代码示例来源:origin: redisson/redisson

Iterator<T> it = blockingIterable().iterator();
while (it.hasNext()) {
  try {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeALotOfSourcesOneByOneSynchronously() {
  int n = 10000;
  List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(n);
  for (int i = 0; i < n; i++) {
    sourceList.add(Observable.just(i));
  }
  Iterator<Integer> it = Observable.merge(Observable.fromIterable(sourceList), 1).blockingIterable().iterator();
  int j = 0;
  while (it.hasNext()) {
    assertEquals((Integer)j, it.next());
    j++;
  }
  assertEquals(j, n);
}

代码示例来源:origin: ReactiveX/RxJava

@Ignore("subscribe() should not throw")
@Test(expected = TestException.class)
public void testExceptionThrownFromOnSubscribe() {
  Iterable<String> strings = Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      throw new TestException("intentional");
    }
  }).blockingIterable();
  for (String string : strings) {
    // never reaches here
    System.out.println(string);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeALotOfSourcesOneByOneSynchronouslyTakeHalf() {
  int n = 10000;
  List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(n);
  for (int i = 0; i < n; i++) {
    sourceList.add(Observable.just(i));
  }
  Iterator<Integer> it = Observable.merge(Observable.fromIterable(sourceList), 1).take(n / 2).blockingIterable().iterator();
  int j = 0;
  while (it.hasNext()) {
    assertEquals((Integer)j, it.next());
    j++;
  }
  assertEquals(j, n / 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void testToIteratorWithException() {
  Observable<String> obs = Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onNext("one");
      observer.onError(new TestException());
    }
  });
  Iterator<String> it = obs.blockingIterable().iterator();
  assertEquals(true, it.hasNext());
  assertEquals("one", it.next());
  assertEquals(true, it.hasNext());
  it.next();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMaxConcurrent() {
  for (int times = 0; times < 100; times++) {
    int observableCount = 100;
    // Test maxConcurrent from 2 to 12
    int maxConcurrent = 2 + (times % 10);
    AtomicInteger subscriptionCount = new AtomicInteger(0);
    List<Observable<String>> os = new ArrayList<Observable<String>>();
    List<SubscriptionCheckObservable> scos = new ArrayList<SubscriptionCheckObservable>();
    for (int i = 0; i < observableCount; i++) {
      SubscriptionCheckObservable sco = new SubscriptionCheckObservable(subscriptionCount, maxConcurrent);
      scos.add(sco);
      os.add(Observable.unsafeCreate(sco));
    }
    Iterator<String> iter = Observable.merge(os, maxConcurrent).blockingIterable().iterator();
    List<String> actual = new ArrayList<String>();
    while (iter.hasNext()) {
      actual.add(iter.next());
    }
    //            System.out.println("actual: " + actual);
    assertEquals(5 * observableCount, actual.size());
    for (SubscriptionCheckObservable sco : scos) {
      assertFalse(sco.failed);
    }
  }
}

代码示例来源:origin: akarnokd/RxJava2Jdk8Interop

/**
 * Returns a blocking Stream of the elements of the Observable.
 * <p>
 * Closing the Stream will cancel the flow.
 * @param <T> the value type
 * @return the Function to be used with {@code Observable.to}.
 */
public static <T> Function<Observable<T>, Stream<T>> toStream() {
  return f -> ZeroOneIterator.toStream(f.blockingIterable().iterator());
}

代码示例来源:origin: PacktPublishing/Learning-RxJava

@Test
  public void testIterable() {
    Observable<String> source =
        Observable.just("Alpha", "Beta", "Gamma", "Delta",
            "Zeta");
    Iterable<String> allWithLengthFive = source.filter(s ->
        s.length() == 5)
        .blockingIterable();
    for (String s: allWithLengthFive) {
      assertTrue(s.length() == 5);
    }
  }
}

代码示例来源:origin: xjdr/xio

@Test
public void testHttp1toHttp1ServerPostMany() throws Exception {
 setupClient(NUM_REQUESTS, true);
 setupFrontBack(false, false);
 verify(multipleAsyncRequests(true).blockingIterable());
 assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
 assertProxiedRequests(NUM_REQUESTS * 2);
}

代码示例来源:origin: xjdr/xio

@Test
public void testHttp2toHttp1ServerGetMany() throws Exception {
 setupClient(NUM_REQUESTS, true);
 setupFrontBack(true, false);
 verify(multipleAsyncRequests(false).blockingIterable());
 assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
 assertProxiedRequests(NUM_REQUESTS * 2);
}

代码示例来源:origin: xjdr/xio

@Test
public void testHttp1toHttp2ServerPostMany() throws Exception {
 setupClient(NUM_REQUESTS, false);
 setupFrontBack(false, true);
 verify(multipleAsyncRequests(true).blockingIterable());
 assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
 assertProxiedRequests(NUM_REQUESTS * 2);
}

代码示例来源:origin: xjdr/xio

@Test
public void testHttp1toHttp1ServerGetMany() throws Exception {
 setupClient(NUM_REQUESTS, true);
 setupFrontBack(false, false);
 verify(multipleAsyncRequests(false).blockingIterable());
 assertEquals(NUM_REQUESTS * 2, backEnd1.getRequestCount());
 assertProxiedRequests(NUM_REQUESTS * 2);
}

相关文章

Observable类方法