本文整理了Java中io.reactivex.Observable.reduce()
方法的一些代码示例,展示了Observable.reduce()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.reduce()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:reduce
[英]Returns a Maybe that applies a specified accumulator function to the first item emitted by a source ObservableSource, then feeds the result of that function along with the second item emitted by the source ObservableSource into the same function, and so on until all items have been emitted by the finite source ObservableSource, and emits the final result from the final call to your function as its sole item.
This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject method that does a similar operation on lists.
Note that this operator requires the upstream to signal onComplete for the accumulator object to be emitted. Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError. Scheduler: reduce does not operate by default on a particular Scheduler.
[中]返回一个函数,该函数将指定的累加器函数应用于源ObservableSource发出的第一个项,然后将该函数的结果与源ObservableSource发出的第二个项一起提供给同一个函数,依此类推,直到有限源ObservableSource发出所有项,并将函数的最终调用的最终结果作为其唯一项发出。
这种技术在这里被称为“reduce”,在其他编程环境中有时被称为“聚合”、“折叠”、“累积”、“压缩”或“注入”。例如,Groovy有一个inject方法,可以对列表执行类似的操作。
请注意,此运算符要求上游发出信号,通知要发射的累加器对象完成。无限且永远不完整的源永远不会通过该运算符发出任何信息,而无限源可能会导致致命的OutOfMemory错误。调度程序:默认情况下,reduce不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public MaybeSource<Object> apply(Observable<Object> o)
throws Exception {
return o.reduce(new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return a;
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public SingleSource<Integer> apply(Observable<Integer> o)
throws Exception {
return o.reduce(0, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
return a;
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
public void libraryFunctionActingOnMovieObservables(Observable<Movie> obs) {
obs.reduce(new BiFunction<Movie, Movie, Movie>() {
@Override
public Movie apply(Movie t1, Movie t2) {
return t2;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void reduceSeedFunctionNull() {
just1.reduce(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void reduceSeedNull() {
just1.reduce(null, new BiFunction<Object, Integer, Object>() {
@Override
public Object apply(Object a, Integer b) {
return 1;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void reduceFunctionNull() {
just1.reduce(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void reduceMaybeCheckDisposed() {
TestHelper.checkDisposed(Observable.just(new Object()).reduce(new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return a;
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void reduceFunctionReturnsNull() {
Observable.just(1, 1).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) {
return null;
}
}).blockingGet();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAggregateAsIntSumResultSelectorThrows() {
Function<Integer, Integer> error = new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t1) {
throw new TestException();
}
};
Single<Integer> result = Observable.just(1, 2, 3, 4, 5)
.reduce(0, sum).map(error);
result.subscribe(singleObserver);
verify(singleObserver, never()).onSuccess(any());
verify(singleObserver, times(1)).onError(any(TestException.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAggregateAsIntSum() {
Single<Integer> result = Observable.just(1, 2, 3, 4, 5).reduce(0, sum)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) {
return v;
}
});
result.subscribe(singleObserver);
verify(singleObserver).onSuccess(1 + 2 + 3 + 4 + 5);
verify(singleObserver, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithNoInitialValue() throws InterruptedException {
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Maybe<Integer> reduced = source.reduce(sum);
Integer r = reduced.blockingGet();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void reduceSeedFunctionReturnsNull() {
just1.reduce(1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) {
return null;
}
}).blockingGet();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithInitialValue() throws InterruptedException {
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Single<Integer> reduced = source.reduce(0, sum);
Integer r = reduced.blockingGet();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 30000)
public void testIssue1527() throws InterruptedException {
//https://github.com/ReactiveX/RxJava/pull/1527
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Maybe<Integer> reduced = source.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer i1, Integer i2) {
return i1 + i2;
}
});
Integer r = reduced.blockingGet();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void reduceInts() {
Observable<Integer> o = Observable.just(1, 2, 3);
int value = o.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).blockingGet();
assertEquals(6, value);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithNoInitialValueObservable() throws InterruptedException {
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Observable<Integer> reduced = source.reduce(sum).toObservable();
Integer r = reduced.blockingFirst();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressureWithInitialValueObservable() throws InterruptedException {
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Observable<Integer> reduced = source.reduce(0, sum).toObservable();
Integer r = reduced.blockingFirst();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 30000)
public void testIssue1527Observable() throws InterruptedException {
//https://github.com/ReactiveX/RxJava/pull/1527
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Observable<Integer> reduced = source.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer i1, Integer i2) {
return i1 + i2;
}
}).toObservable();
Integer r = reduced.blockingFirst();
assertEquals(21, r.intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void reduceIntsObservable() {
Observable<Integer> o = Observable.just(1, 2, 3);
int value = o.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).toObservable().blockingSingle();
assertEquals(6, value);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testReduceWithEmptyObservable() {
Observable<Integer> o = Observable.range(1, 0);
o.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.toObservable()
.test()
.assertResult();
}
内容来源于网络,如有侵权,请联系作者删除!