本文整理了Java中io.reactivex.Observable.zipWith()
方法的一些代码示例,展示了Observable.zipWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zipWith()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:zipWith
[英]Returns an Observable that emits items that are the result of applying a specified function to pairs of values, one each from the source ObservableSource and another specified ObservableSource.
The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while disposing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnComplete()). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will dispose B immediately. For example:
range(1, 5).doOnComplete(action1).zipWith(range(6, 5).doOnComplete(action2), (a, b) -> a + b)
action1 will be called but action2 won't.
To work around this termination property, use #doOnDispose(Action) as well or use using() to do cleanup in case of completion or a dispose() call. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,它发出的项是将指定函数应用于成对值的结果,每个值来自源ObservableSource和另一个指定ObservableSource。
操作员按指定的顺序订阅其源,如果其中一个源比其他源短,则在处理其他源时急切地完成。因此,这些其他源可能永远无法运行到完成(因此不调用doOnComplete()。如果源的长度完全相同,也可能发生这种情况;如果源A完成且B已被消耗且即将完成,则操作员检测到A不会发送更多值,并将立即处理B。例如:
range(1, 5).doOnComplete(action1).zipWith(range(6, 5).doOnComplete(action2), (a, b) -> a + b)
将调用action1,但不会调用action2。
要解决此终止属性,请同时使用#doOnDispose(Action)或使用using()在完成或dispose()调用时进行清理。调度器:zipWith默认情况下不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<?> apply(Observable<? extends Throwable> attempt) {
return attempt.zipWith(Observable.just(1), new BiFunction<Throwable, Integer, Void>() {
@Override
public Void apply(Throwable o, Integer integer) {
return null;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Object> apply(Observable<Integer> o) throws Exception {
return o.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + b;
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableNull() {
just1.zipWith((Iterable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithObservableNull() {
just1.zipWith((Observable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerNull() {
just1.zipWith(Arrays.asList(1), null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithCombinerNull() {
just1.zipWith(just1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + b;
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableOneIsNull() {
Observable.just(1, 2).zipWith(Arrays.asList(1, null), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return 1;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerReturnsNull() {
just1.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake2() {
Observable<Integer> o = Observable.just(1, 2, 3, 4, 5);
Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
SquareStr squareStr = new SquareStr();
o.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
assertEquals(2, squareStr.counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithCombinerReturnsNull() {
just1.zipWith(just1, new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void zipWithDelayError() {
Observable.just(1)
.zipWith(Observable.just(2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
return a + b;
}
}, true)
.test()
.assertResult(3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void zipWithDelayErrorBufferSize() {
Observable.just(1)
.zipWith(Observable.just(2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
return a + b;
}
}, true, 16)
.test()
.assertResult(3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void zipWithIterableIteratorNull() {
just1.zipWith(new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return null;
}
}, new BiFunction<Integer, Object, Object>() {
@Override
public Object apply(Integer a, Object b) {
return 1;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testStart() {
Observable<String> os = OBSERVABLE_OF_5_INTEGERS
.zipWith(OBSERVABLE_OF_5_INTEGERS, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer a, Integer b) {
return a + "-" + b;
}
});
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(5, list.size());
assertEquals("1-1", list.get(0));
assertEquals("2-2", list.get(1));
assertEquals("5-5", list.get(4));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void iteratorThrows() {
Observable.just(1).zipWith(new CrashingIterable(100, 1, 100), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + b;
}
})
.test()
.assertFailureAndMessage(TestException.class, "hasNext()");
}
代码示例来源:origin: ReactiveX/RxJava
final CountDownLatch infiniteObservable = new CountDownLatch(1);
Observable<String> os = OBSERVABLE_OF_5_INTEGERS
.zipWith(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservable), new BiFunction<Integer, Integer, String>() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testStartAsync() throws InterruptedException {
Observable<String> os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1))
.zipWith(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)), new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer a, Integer b) {
return a + "-" + b;
}
}).take(5);
TestObserver<String> to = new TestObserver<String>();
os.subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
assertEquals(5, to.valueCount());
assertEquals("1-1", to.values().get(0));
assertEquals("2-2", to.values().get(1));
assertEquals("5-5", to.values().get(4));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() {
Observable.range(1, 50)
.zipWith(Observable.interval(5, TimeUnit.MILLISECONDS),
new BiFunction<Integer, Long, Integer>() {
@Override
public Integer apply(Integer integer, Long aLong) {
return integer;
}
})
.buffer(Observable.interval(0, 100, TimeUnit.MILLISECONDS),
new Function<Long, Observable<?>>() {
@Override
public Observable<?> apply(Long a) {
return Observable.just(a).delay(200, TimeUnit.MILLISECONDS);
}
})
.test()
.assertSubscribed()
.awaitDone(3, TimeUnit.SECONDS)
.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void bufferedCanCompleteIfOpenNeverCompletesDropping() {
Observable.range(1, 50)
.zipWith(Observable.interval(5, TimeUnit.MILLISECONDS),
new BiFunction<Integer, Long, Integer>() {
@Override
public Integer apply(Integer integer, Long aLong) {
return integer;
}
})
.buffer(Observable.interval(0, 200, TimeUnit.MILLISECONDS),
new Function<Long, Observable<?>>() {
@Override
public Observable<?> apply(Long a) {
return Observable.just(a).delay(100, TimeUnit.MILLISECONDS);
}
})
.test()
.assertSubscribed()
.awaitDone(3, TimeUnit.SECONDS)
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!