本文整理了Java中io.reactivex.Observable.concatMapDelayError()
方法的一些代码示例,展示了Observable.concatMapDelayError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMapDelayError()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:concatMapDelayError
[英]Maps each of the items into an ObservableSource, subscribes to them one after the other, one at a time and emits their values in order while delaying any error from either this or any of the inner ObservableSources till all of them terminate.
Scheduler: concatMapDelayError does not operate by default on a particular Scheduler.
[中]将每个项目映射到一个ObservableSource中,一次一个地逐个订阅它们,并按顺序发送它们的值,同时延迟来自该或任何内部ObservableSource的任何错误,直到它们全部终止。
调度程序:默认情况下,concatMapDelayError不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
/**
* Maps each of the items into an ObservableSource, subscribes to them one after the other,
* one at a time and emits their values in order
* while delaying any error from either this or any of the inner ObservableSources
* till all of them terminate.
* <p>
* <img width="640" height="347" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapDelayError.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources.
* @return the new ObservableSource instance with the concatenation behavior
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMapDelayError(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMapDelayError(mapper, bufferSize(), true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapDelayError() {
Observable.just(Observable.just(1), Observable.just(2))
.concatMapDelayError(Functions.<Observable<Integer>>identity())
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapDelayErrorJustSource() {
Observable.just(0)
.concatMapDelayError(new Function<Object, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Object v) throws Exception {
return Observable.just(1);
}
}, 16, true)
.test()
.assertResult(1);
}
代码示例来源:origin: redisson/redisson
/**
* Maps each of the items into an ObservableSource, subscribes to them one after the other,
* one at a time and emits their values in order
* while delaying any error from either this or any of the inner ObservableSources
* till all of them terminate.
* <p>
* <img width="640" height="347" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMapDelayError.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources.
* @return the new ObservableSource instance with the concatenation behavior
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMapDelayError(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMapDelayError(mapper, bufferSize(), true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedPollThrowsDelayError() {
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapperThrowsDelayError() {
Observable.just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalDelayErrors() {
Observable.just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalDelayErrorsTillTheEnd() {
Observable.just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
}, 16, true)
.test()
.assertResult(1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose2() {
TestHelper.checkDisposed(Observable.<Integer>just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.error(new TestException());
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerErrorDelayError() {
Observable.<Integer>just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.error(new TestException());
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapDelayErrorWithError() {
Observable.just(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())), Observable.just(2))
.concatMapDelayError(Functions.<Observable<Integer>>identity())
.test()
.assertFailure(TestException.class, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerErrorDelayError2() {
Observable.<Integer>just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw new TestException();
}
});
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatMapDelayErrorEmptySource() {
assertSame(Observable.empty(), Observable.<Object>empty()
.concatMapDelayError(new Function<Object, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Object v) throws Exception {
return Observable.just(1);
}
}, 16, true));
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates elements of each ObservableSource provided via an Iterable sequence into a single sequence
* of elements without interleaving them.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common value type of the sources
* @param sources the Iterable sequence of ObservableSources
* @return the new Observable instance
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), bufferSize(), false);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainErrorDelayed() {
Observable.<Integer>error(new TestException())
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: redisson/redisson
/**
* Concatenates elements of each ObservableSource provided via an Iterable sequence into a single sequence
* of elements without interleaving them.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common value type of the sources
* @param sources the Iterable sequence of ObservableSources
* @return the new Observable instance
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), bufferSize(), false);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badInnerDelayError() {
@SuppressWarnings("rawtypes")
final Observer[] o = { null };
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
o[0] = observer;
observer.onSubscribe(Disposables.empty());
observer.onComplete();
}
};
}
})
.test()
.assertResult();
o[0].onError(new TestException());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!