本文整理了Java中io.reactivex.Observable.unsubscribeOn()
方法的一些代码示例,展示了Observable.unsubscribeOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.unsubscribeOn()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:unsubscribeOn
[英]Modifies the source ObservableSource so that subscribers will dispose it on a specified Scheduler.
Scheduler: You specify which Scheduler this operator will use.
[中]修改源ObservableSource,以便订阅者在指定的计划程序上处理它。
调度器:指定该操作员将使用的调度器。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void unsubscribeOnNull() {
just1.unsubscribeOn(null);
}
代码示例来源:origin: Polidea/RxAndroidBle
/**
* Function that returns an observable that emits {@link Boolean#TRUE} every time the button is being clicked. It enables the button
* whenever the returned Observable is being subscribed and disables it when un-subscribed. Takes care of making interactions with
* the button on the proper thread.
*
* @param button the button to wrap into an Observable
* @return the observable
*/
@NonNull
private static Observable<Boolean> activatedClicksObservable(Button button) {
return Observable.using(
() -> {
button.setEnabled(true);
return button;
},
aView -> RxView.clicks(aView).map(aVoid -> Boolean.TRUE),
aView -> aView.setEnabled(false)
)
.subscribeOn(AndroidSchedulers.mainThread()) // RxView expects to be subscribed on the Main Thread
.unsubscribeOn(AndroidSchedulers.mainThread());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).unsubscribeOn(Schedulers.single()));
}
代码示例来源:origin: Polidea/RxAndroidBle
@Override
public Observable<ScanResult> call() {
scanPreconditionVerifier.verify();
final ScanSetup scanSetup = scanSetupBuilder.build(scanSettings, scanFilters);
final Operation<RxBleInternalScanResult> scanOperation = scanSetup.scanOperation;
return operationQueue.queue(scanOperation)
.unsubscribeOn(bluetoothInteractionScheduler)
.compose(scanSetup.scanOperationBehaviourEmulatorTransformer)
.map(internalToExternalScanResultMapFunction)
.mergeWith(RxBleClientImpl.this.<ScanResult>bluetoothAdapterOffExceptionObservable());
}
});
代码示例来源:origin: Polidea/RxAndroidBle
.unsubscribeOn(subscribeScheduler)
.subscribe(new Observer<T>() {
@Override
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
final int[] calls = { 0 };
Observable.just(1)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.unsubscribeOn(Schedulers.single())
.test()
.assertResult(1);
assertEquals(0, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);
代码示例来源:origin: ReactiveX/RxJava
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);
代码示例来源:origin: Polidea/RxAndroidBle
@Override
public ObservableSource<RxBleConnection> call() throws Exception {
final ConnectionComponent connectionComponent = connectionComponentBuilder
.connectionModule(new ConnectionModule(options))
.build();
final Set<ConnectionSubscriptionWatcher> connSubWatchers = connectionComponent.connectionSubscriptionWatchers();
return obtainRxBleConnection(connectionComponent)
.mergeWith(observeDisconnections(connectionComponent))
.delaySubscription(enqueueConnectOperation(connectionComponent))
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
for (ConnectionSubscriptionWatcher csa : connSubWatchers) {
csa.onConnectionSubscribed();
}
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
for (ConnectionSubscriptionWatcher csa : connSubWatchers) {
csa.onConnectionUnsubscribed();
}
}
})
.subscribeOn(callbacksScheduler)
.unsubscribeOn(callbacksScheduler);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
final int[] calls = { 0 };
Observable.error(new TestException())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.unsubscribeOn(Schedulers.single())
.test()
.assertFailure(TestException.class);
assertEquals(0, calls[0]);
}
代码示例来源:origin: forkachild/reel-search-android
public static <U> ObservableTransformer<U, U> composeObservable() {
return upstream -> upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(Schedulers.io());
}
代码示例来源:origin: LRH1993/RetrofitRxJavaBox
@Override
public ObservableSource apply(Observable upstream) {
return ((Observable) upstream).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
代码示例来源:origin: LRH1993/LiveCircle
@Override
public ObservableSource apply(Observable upstream) {
return ((Observable) upstream).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
代码示例来源:origin: Tophold/FinancialCustomerView
public static Observable call(Observable<?> observable, Observer observer) {
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
return observable;
}
}
代码示例来源:origin: onlyloveyd/JuheNews
private void commonOp(Observable observable, Observer subscriber) {
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
//在访问HttpMethods时创建单例
代码示例来源:origin: leftcoding/GankLy
public void downloadApk(Consumer<InputStream> next, Observer subscriber) {
mDownloadService.downloadApk()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.map(ResponseBody::byteStream)
.observeOn(Schedulers.io())
.doOnNext(next)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
}
代码示例来源:origin: huntermr/FastAndroid
@Override
public <T> void startAsync(Observable<T> observable, Observer<T> observer) {
observable
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(this.<T>bind())
.subscribe(observer);
}
代码示例来源:origin: wzmyyj/ZYMK
public void getSmartSearch(final String key, Observer<SearchBox> observer) {
Gson gson = new GsonBuilder().registerTypeAdapter(SearchBox.class, new SearchBox.Deserializer2()).create();
Retrofit retrofit = ReOk.bind(Urls.ZYMK_BaseApi, gson);
SearchService service = retrofit.create(SearchService.class);
Observable<SearchBox> observable = service.getSmartSearch(key);
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
}
代码示例来源:origin: wzmyyj/ZYMK
public void getComic(int comic_id, Observer<ComicBox> observer) {
Gson gson = new GsonBuilder().registerTypeAdapter(ComicBox.class, new ComicBox.Deserializer2()).create();
Retrofit retrofit = ReOk.bind(Urls.ZYMK_BaseApi, gson);
ComicService service = retrofit.create(ComicService.class);
Observable<ComicBox> observable = service.getComic(comic_id);
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
代码示例来源:origin: leftcoding/GankLy
protected <T> Observable<T> toObservable(Observable<T> o) {
return o.retry(3)
.subscribeOn(Schedulers.computation())
.unsubscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
}
内容来源于网络,如有侵权,请联系作者删除!