本文整理了Java中io.reactivex.Observable.concatArrayEager()
方法的一些代码示例,展示了Observable.concatArrayEager()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatArrayEager()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatArrayEager
[英]Concatenates a sequence of ObservableSources eagerly into a single stream of values.
Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them in order, each one after the previous one completes. Scheduler: This method does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates an array of ObservableSources eagerly into a single stream of values.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of ObservableSources that need to be eagerly concatenated
* @return the new ObservableSource instance with the specified concatenation behavior
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources) {
return concatArrayEager(bufferSize(), bufferSize(), sources);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness2() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source).subscribe(to);
Assert.assertEquals(2, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness5() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source).subscribe(to);
Assert.assertEquals(5, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness7() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(7, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness3() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source).subscribe(to);
Assert.assertEquals(3, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness6() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(6, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness4() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source).subscribe(to);
Assert.assertEquals(4, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness8() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(8, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testEagerness9() {
final AtomicInteger count = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
Observable.concatArrayEager(source, source, source, source, source, source, source, source, source).subscribe(to);
Assert.assertEquals(9, count.get());
to.assertValueCount(count.get());
to.assertNoErrors();
to.assertComplete();
}
代码示例来源:origin: redisson/redisson
/**
* Concatenates a sequence of ObservableSources eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
* in order, each one after the previous one completes.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
* @return the new ObservableSource instance with the specified concatenation behavior
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources) {
return concatArrayEager(bufferSize(), bufferSize(), sources);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testInnerError() {
// TODO verify: concatMapEager subscribes first then consumes the sources is okay
PublishSubject<Integer> ps = PublishSubject.create();
Observable.concatArrayEager(Observable.just(1), ps)
.subscribe(to);
ps.onError(new TestException());
to.assertValue(1);
to.assertError(TestException.class);
to.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void testInnerEmpty() {
Observable.concatArrayEager(Observable.empty(), Observable.empty()).subscribe(to);
to.assertNoValues();
to.assertNoErrors();
to.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!