本文整理了Java中io.reactivex.Observable.subscribeOn()
方法的一些代码示例,展示了Observable.subscribeOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.subscribeOn()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:subscribeOn
[英]Asynchronously subscribes Observers to this ObservableSource on the specified Scheduler.
Scheduler: You specify which Scheduler this operator will use.
[中]在指定的计划程序上异步订阅此ObservableSource的观察者。
调度器:指定该操作员将使用的调度器。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
Observable<Integer> o = Observable.just(t)
.subscribeOn(sch)
;
Subject<Integer> subject = UnicastSubject.create();
o.subscribe(subject);
return subject;
}
};
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
return Observable.just(1).subscribeOn(Schedulers.computation());
}
}).subscribe(to);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<String> apply(Throwable t1) {
return Observable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void subscribeOnNull() {
just1.subscribeOn(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t1) {
return composer(Observable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, new BiFunction<Integer, Integer, Integer>() {
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t1) {
return composer(Observable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, m);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
return Observable.range(1, 1000).subscribeOn(Schedulers.computation());
}
}).observeOn(Schedulers.newThread()).subscribe(to);
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void testTake() throws Exception {
List<Observable<Integer>> sourceList = new ArrayList<Observable<Integer>>(3);
sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
sourceList.add(Observable.range(0, 100000).subscribeOn(Schedulers.io()));
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.merge(sourceList, 2).take(5).subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
to.assertValueCount(5);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer i) {
return Observable.range(1, innerSize).subscribeOn(Schedulers.computation());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).subscribeOn(Schedulers.single()));
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(/* timeout = 1000, */expected = RuntimeException.class)
public void testHasNextThrows() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
it.hasNext();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = RuntimeException.class)
public void testNextThrows() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
it.next();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
}).toObservable()
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingSubscribeConsumerConsumer() {
final List<Object> list = new ArrayList<Object>();
Observable.range(1, 5)
.subscribeOn(Schedulers.computation())
.blockingSubscribe(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
list.add(v);
}
}, Functions.emptyConsumer());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), list);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirstDefault() {
assertEquals(1, Observable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirst() {
assertEquals(1, Observable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 20000)
public void testNoStackOverFlow() {
Observable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatTake() {
Observable<Integer> xs = Observable.just(1, 2);
Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testOnError() {
TestObserver<String> to = new TestObserver<String>();
Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposables.empty());
observer.onError(new RuntimeException("fail"));
}
}).subscribeOn(Schedulers.computation()).subscribe(to);
to.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
to.assertTerminated();
}
内容来源于网络,如有侵权,请联系作者删除!