本文整理了Java中io.reactivex.Observable.blockingLatest()
方法的一些代码示例,展示了Observable.blockingLatest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.blockingLatest()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:blockingLatest
[英]Returns an Iterable that returns the latest item emitted by this Observable, waiting if necessary for one to become available.
If this Observable produces items faster than Iterator.next takes them, onNext events might be skipped, but onError or onComplete events are not.
Note also that an onNext directly followed by onComplete might hide the onNextevent. Scheduler: blockingLatest does not operate by default on a particular Scheduler.
[中]返回一个Iterable,返回此可观察对象发出的最新项,如果需要,等待其中一项可用。
如果这个可观察项比迭代器更快地生成项。接下来,可能会跳过onNext事件,但不会跳过onError或onComplete事件。
还请注意,紧跟onComplete的onNext可能会隐藏onNextevent。调度程序:blockingLatest默认情况下不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Integer apply(Integer v) throws Exception {
Observable.just(1).delay(10, TimeUnit.SECONDS).blockingLatest().iterator().hasNext();
return v;
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test
public void interrupted() {
Iterator<Object> it = Observable.never().blockingLatest().iterator();
Thread.currentThread().interrupt();
try {
it.hasNext();
} catch (RuntimeException ex) {
assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException);
}
Thread.interrupted();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = UnsupportedOperationException.class)
public void remove() {
Observable.never().blockingLatest().iterator().remove();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NoSuchElementException.class)
public void empty() {
Observable.empty().blockingLatest().iterator().next();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testEmpty() {
Observable<Long> source = Observable.<Long> empty();
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
Assert.assertEquals(false, it.hasNext());
it.next();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(/* timeout = 1000, */expected = RuntimeException.class)
public void testHasNextThrows() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
it.hasNext();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = RuntimeException.class)
public void testNextThrows() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
it.next();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = TestException.class)
public void error() {
Observable.error(new TestException()).blockingLatest().iterator().next();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error2() {
Iterator<Object> it = Observable.error(new TestException()).blockingLatest().iterator();
for (int i = 0; i < 3; i++) {
try {
it.hasNext();
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 10; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(Long.valueOf(i), it.next());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testSimple() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 9; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(Long.valueOf(i), it.next());
}
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(false, it.hasNext());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
for (int j = 0; j < 3; j++) {
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 9; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(Long.valueOf(i), it.next());
}
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(false, it.hasNext());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testFasterSource() {
PublishSubject<Integer> source = PublishSubject.create();
Observable<Integer> blocker = source;
Iterable<Integer> iter = blocker.blockingLatest();
Iterator<Integer> it = iter.iterator();
source.onNext(1);
Assert.assertEquals(Integer.valueOf(1), it.next());
source.onNext(2);
source.onNext(3);
Assert.assertEquals(Integer.valueOf(3), it.next());
source.onNext(4);
source.onNext(5);
source.onNext(6);
Assert.assertEquals(Integer.valueOf(6), it.next());
source.onNext(7);
source.onComplete();
Assert.assertEquals(false, it.hasNext());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void onError() {
Iterator<Object> it = Observable.never().blockingLatest().iterator();
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
((Observer<Object>)it).onError(new TestException());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}
代码示例来源:origin: PacktPublishing/Learning-RxJava
@Test
public void testBlockingLatest() {
Observable<Long> source =
Observable.interval(1, TimeUnit.MICROSECONDS)
.take(1000);
Iterable<Long> iterable = source.blockingLatest();
for (Long i: iterable) {
System.out.println(i);
}
}
}
代码示例来源:origin: PacktPublishing/Learning-RxJava
@Test
public void testBlockingLatest() {
Observable<Long> source =
Observable.interval(1, TimeUnit.MICROSECONDS)
.take(1000);
Iterable<Long> iterable = source.blockingLatest();
for (Long i: iterable) {
System.out.println(i);
}
}
}
内容来源于网络,如有侵权,请联系作者删除!