本文整理了Java中io.reactivex.Observable.test()
方法的一些代码示例,展示了Observable.test()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.test()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:test
[英]Creates a TestObserver and subscribes it to this Observable. Scheduler: test does not operate by default on a particular Scheduler.
[中]创建TestObserver并将其订阅到此Observable。调度程序:默认情况下,测试不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concat3() {
Observable.concat(Observable.just(1), Observable.just(2), Observable.just(3))
.test()
.assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void concatObservableDelayErrorTillEnd() {
Observable.concatDelayError(
Observable.just(Observable.just(1), Observable.just(2),
Observable.just(3).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(4)), 2, true)
.test()
.assertFailure(TestException.class, 1, 2, 3, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void ambIterableOrder() {
Observable<Integer> error = Observable.error(new RuntimeException());
Observable.amb(Arrays.asList(Observable.just(1), error)).test().assertValue(1).assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mapperThrowsDelayError() {
Observable.just(1).hide()
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void bufferTimeSkipDefault() {
Observable.range(1, 5).buffer(1, 1, TimeUnit.MINUTES)
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void restartTimer() {
Observable.range(1, 5)
.buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true)
.test()
.assertResult(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void bufferTimedExactEmpty() {
Observable.empty()
.buffer(1, TimeUnit.DAYS)
.test()
.assertResult(Collections.emptyList());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() {
Observable<Integer> cache = Observable.range(1, 5).cache();
cache.take(2).test().assertResult(1, 2);
cache.take(3).test().assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void firstOrErrorMultipleElementsObservable() {
Observable.just(1, 2, 3)
.firstOrError()
.toObservable()
.test()
.assertNoErrors()
.assertValue(1);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void bufferBoundaryHint() {
Observable.range(1, 5).buffer(Observable.timer(1, TimeUnit.MINUTES), 2)
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalDelayBoundary() {
Observable.range(1, 5)
.concatMapEagerDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer t) {
return Observable.range(t, 2);
}
}, false)
.test()
.assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nextCrash() {
Maybe.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return new CrashingIterable(100, 100, 1);
}
})
.test()
.assertFailureAndMessage(TestException.class, "next()");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerError() {
Observable.just(1)
.switchMapMaybe(Functions.justFunction(Maybe.error(new TestException())))
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void elementAtIndex1WithDefaultOnEmptySourceObservable() {
Observable.empty()
.elementAt(1, 10)
.toObservable()
.test()
.assertResult(10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainErrorDelayed() {
Observable.<Integer>error(new TestException())
.concatMapDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timedCancelledUpfront() {
TestScheduler sch = new TestScheduler();
TestObserver<List<Object>> to = Observable.never()
.buffer(1, TimeUnit.MILLISECONDS, sch)
.test(true);
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
to.assertEmpty();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainError() {
Observable.error(new TestException())
.concatMapSingle(Functions.justFunction(Single.just(1)))
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@SuppressWarnings("unchecked")
public void openClosemainError() {
Observable.error(new TestException())
.buffer(Observable.never(), Functions.justFunction(Observable.never()))
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelMain() {
SingleSubject<Integer> ss = SingleSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
to.cancel();
assertFalse(ss.hasObservers());
assertFalse(ps.hasObservers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelMain() {
MaybeSubject<Integer> ms = MaybeSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ms.flatMapObservable(Functions.justFunction(ps))
.test();
assertTrue(ms.hasObservers());
assertFalse(ps.hasObservers());
to.cancel();
assertFalse(ms.hasObservers());
assertFalse(ps.hasObservers());
}
内容来源于网络,如有侵权,请联系作者删除!