io.reactivex.Observable.blockingLatest()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(102)

本文整理了Java中io.reactivex.Observable.blockingLatest()方法的一些代码示例,展示了Observable.blockingLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.blockingLatest()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:blockingLatest

Observable.blockingLatest介绍

[英]Returns an Iterable that returns the latest item emitted by this Observable, waiting if necessary for one to become available.

If this Observable produces items faster than Iterator.next takes them, onNext events might be skipped, but onError or onComplete events are not.

Note also that an onNext directly followed by onComplete might hide the onNextevent. Scheduler: blockingLatest does not operate by default on a particular Scheduler.
[中]返回一个Iterable,返回此可观察对象发出的最新项,如果需要,等待其中一项可用。
如果这个可观察项比迭代器更快地生成项。接下来,可能会跳过onNext事件,但不会跳过onError或onComplete事件。
还请注意,紧跟onComplete的onNext可能会隐藏onNextevent。调度程序:blockingLatest默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Integer apply(Integer v) throws Exception {
    Observable.just(1).delay(10, TimeUnit.SECONDS).blockingLatest().iterator().hasNext();
    return v;
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test
public void interrupted() {
  Iterator<Object> it = Observable.never().blockingLatest().iterator();
  Thread.currentThread().interrupt();
  try {
    it.hasNext();
  } catch (RuntimeException ex) {
    assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException);
  }
  Thread.interrupted();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = UnsupportedOperationException.class)
public void remove() {
  Observable.never().blockingLatest().iterator().remove();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NoSuchElementException.class)
public void empty() {
  Observable.empty().blockingLatest().iterator().next();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testEmpty() {
  Observable<Long> source = Observable.<Long> empty();
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  Assert.assertEquals(false, it.hasNext());
  it.next();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(/* timeout = 1000, */expected = RuntimeException.class)
public void testHasNextThrows() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  it.hasNext();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = RuntimeException.class)
public void testNextThrows() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  it.next();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void error() {
  Observable.error(new TestException()).blockingLatest().iterator().next();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error2() {
  Iterator<Object> it = Observable.error(new TestException()).blockingLatest().iterator();
  for (int i = 0; i < 3; i++) {
    try {
      it.hasNext();
      fail("Should have thrown");
    } catch (TestException ex) {
      // expected
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  // only 9 because take(10) will immediately call onComplete when receiving the 10th item
  // which onComplete will overwrite the previous value
  for (int i = 0; i < 10; i++) {
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(Long.valueOf(i), it.next());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testSimple() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  // only 9 because take(10) will immediately call onComplete when receiving the 10th item
  // which onComplete will overwrite the previous value
  for (int i = 0; i < 9; i++) {
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(true, it.hasNext());
    Assert.assertEquals(Long.valueOf(i), it.next());
  }
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  Assert.assertEquals(false, it.hasNext());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  for (int j = 0; j < 3; j++) {
    Iterator<Long> it = iter.iterator();
    // only 9 because take(10) will immediately call onComplete when receiving the 10th item
    // which onComplete will overwrite the previous value
    for (int i = 0; i < 9; i++) {
      scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
      Assert.assertEquals(true, it.hasNext());
      Assert.assertEquals(Long.valueOf(i), it.next());
    }
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(false, it.hasNext());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testFasterSource() {
  PublishSubject<Integer> source = PublishSubject.create();
  Observable<Integer> blocker = source;
  Iterable<Integer> iter = blocker.blockingLatest();
  Iterator<Integer> it = iter.iterator();
  source.onNext(1);
  Assert.assertEquals(Integer.valueOf(1), it.next());
  source.onNext(2);
  source.onNext(3);
  Assert.assertEquals(Integer.valueOf(3), it.next());
  source.onNext(4);
  source.onNext(5);
  source.onNext(6);
  Assert.assertEquals(Integer.valueOf(6), it.next());
  source.onNext(7);
  source.onComplete();
  Assert.assertEquals(false, it.hasNext());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
  @Test
  public void onError() {
    Iterator<Object> it = Observable.never().blockingLatest().iterator();

    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
      ((Observer<Object>)it).onError(new TestException());

      TestHelper.assertUndeliverable(errors, 0, TestException.class);
    } finally {
      RxJavaPlugins.reset();
    }
  }
}

代码示例来源:origin: PacktPublishing/Learning-RxJava

@Test
  public void testBlockingLatest() {
    Observable<Long> source =
        Observable.interval(1, TimeUnit.MICROSECONDS)
            .take(1000);

    Iterable<Long> iterable = source.blockingLatest();

    for (Long i: iterable) {
      System.out.println(i);
    }
  }
}

代码示例来源:origin: PacktPublishing/Learning-RxJava

@Test
  public void testBlockingLatest() {
    Observable<Long> source =
        Observable.interval(1, TimeUnit.MICROSECONDS)
            .take(1000);

    Iterable<Long> iterable = source.blockingLatest();

    for (Long i: iterable) {
      System.out.println(i);
    }
  }
}

相关文章

Observable类方法