io.reactivex.Observable.unsubscribeOn()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(212)

本文整理了Java中io.reactivex.Observable.unsubscribeOn()方法的一些代码示例,展示了Observable.unsubscribeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.unsubscribeOn()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:unsubscribeOn

Observable.unsubscribeOn介绍

[英]Modifies the source ObservableSource so that subscribers will dispose it on a specified Scheduler.

Scheduler: You specify which Scheduler this operator will use.
[中]修改源ObservableSource,以便订阅者在指定的计划程序上处理它。
调度器:指定该操作员将使用的调度器。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void unsubscribeOnNull() {
  just1.unsubscribeOn(null);
}

代码示例来源:origin: Polidea/RxAndroidBle

/**
 * Function that returns an observable that emits {@link Boolean#TRUE} every time the button is being clicked. It enables the button
 * whenever the returned Observable is being subscribed and disables it when un-subscribed. Takes care of making interactions with
 * the button on the proper thread.
 *
 * @param button the button to wrap into an Observable
 * @return the observable
 */
@NonNull
private static Observable<Boolean> activatedClicksObservable(Button button) {
  return Observable.using(
      () -> {
        button.setEnabled(true);
        return button;
      },
      aView -> RxView.clicks(aView).map(aVoid -> Boolean.TRUE),
      aView -> aView.setEnabled(false)
  )
      .subscribeOn(AndroidSchedulers.mainThread()) // RxView expects to be subscribed on the Main Thread
      .unsubscribeOn(AndroidSchedulers.mainThread());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).unsubscribeOn(Schedulers.single()));
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
  public Observable<ScanResult> call() {
    scanPreconditionVerifier.verify();
    final ScanSetup scanSetup = scanSetupBuilder.build(scanSettings, scanFilters);
    final Operation<RxBleInternalScanResult> scanOperation = scanSetup.scanOperation;
    return operationQueue.queue(scanOperation)
        .unsubscribeOn(bluetoothInteractionScheduler)
        .compose(scanSetup.scanOperationBehaviourEmulatorTransformer)
        .map(internalToExternalScanResultMapFunction)
        .mergeWith(RxBleClientImpl.this.<ScanResult>bluetoothAdapterOffExceptionObservable());
  }
});

代码示例来源:origin: Polidea/RxAndroidBle

.unsubscribeOn(subscribeScheduler)
.subscribe(new Observer<T>() {
  @Override

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normal() {
  final int[] calls = { 0 };
  Observable.just(1)
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      calls[0]++;
    }
  })
  .unsubscribeOn(Schedulers.single())
  .test()
  .assertResult(1);
  assertEquals(0, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);

代码示例来源:origin: ReactiveX/RxJava

.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);

代码示例来源:origin: Polidea/RxAndroidBle

@Override
  public ObservableSource<RxBleConnection> call() throws Exception {
    final ConnectionComponent connectionComponent = connectionComponentBuilder
        .connectionModule(new ConnectionModule(options))
        .build();
    final Set<ConnectionSubscriptionWatcher> connSubWatchers = connectionComponent.connectionSubscriptionWatchers();
    return obtainRxBleConnection(connectionComponent)
        .mergeWith(observeDisconnections(connectionComponent))
        .delaySubscription(enqueueConnectOperation(connectionComponent))
        .doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
            for (ConnectionSubscriptionWatcher csa : connSubWatchers) {
              csa.onConnectionSubscribed();
            }
          }
        })
        .doFinally(new Action() {
          @Override
          public void run() throws Exception {
            for (ConnectionSubscriptionWatcher csa : connSubWatchers) {
              csa.onConnectionUnsubscribed();
            }
          }
        })
        .subscribeOn(callbacksScheduler)
        .unsubscribeOn(callbacksScheduler);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  final int[] calls = { 0 };
  Observable.error(new TestException())
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      calls[0]++;
    }
  })
  .unsubscribeOn(Schedulers.single())
  .test()
  .assertFailure(TestException.class);
  assertEquals(0, calls[0]);
}

代码示例来源:origin: forkachild/reel-search-android

public static <U> ObservableTransformer<U, U> composeObservable() {
  return upstream -> upstream
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .unsubscribeOn(Schedulers.io());
}

代码示例来源:origin: LRH1993/RetrofitRxJavaBox

@Override
  public ObservableSource apply(Observable upstream) {
    return ((Observable) upstream).subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
};

代码示例来源:origin: LRH1993/LiveCircle

@Override
  public ObservableSource apply(Observable upstream) {
    return ((Observable) upstream).subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
};

代码示例来源:origin: Tophold/FinancialCustomerView

public static Observable call(Observable<?> observable, Observer observer) {
    observable.subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);
    return observable;
  }
}

代码示例来源:origin: onlyloveyd/JuheNews

private void commonOp(Observable observable, Observer subscriber) {
  observable.subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(subscriber);
}
//在访问HttpMethods时创建单例

代码示例来源:origin: leftcoding/GankLy

public void downloadApk(Consumer<InputStream> next, Observer subscriber) {
    mDownloadService.downloadApk()
        .subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
        .map(ResponseBody::byteStream)
        .observeOn(Schedulers.io())
        .doOnNext(next)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);
  }
}

代码示例来源:origin: huntermr/FastAndroid

@Override
public <T> void startAsync(Observable<T> observable, Observer<T> observer) {
  observable
      .subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .compose(this.<T>bind())
      .subscribe(observer);
}

代码示例来源:origin: wzmyyj/ZYMK

public void getSmartSearch(final String key, Observer<SearchBox> observer) {
    Gson gson = new GsonBuilder().registerTypeAdapter(SearchBox.class, new SearchBox.Deserializer2()).create();
    Retrofit retrofit = ReOk.bind(Urls.ZYMK_BaseApi, gson);
    SearchService service = retrofit.create(SearchService.class);
    Observable<SearchBox> observable = service.getSmartSearch(key);
    observable.subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

  }
}

代码示例来源:origin: wzmyyj/ZYMK

public void getComic(int comic_id, Observer<ComicBox> observer) {
  Gson gson = new GsonBuilder().registerTypeAdapter(ComicBox.class, new ComicBox.Deserializer2()).create();
  Retrofit retrofit = ReOk.bind(Urls.ZYMK_BaseApi, gson);
  ComicService service = retrofit.create(ComicService.class);
  Observable<ComicBox> observable = service.getComic(comic_id);
  observable.subscribeOn(Schedulers.io())
      .unsubscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(observer);
}

代码示例来源:origin: leftcoding/GankLy

protected <T> Observable<T> toObservable(Observable<T> o) {
    return o.retry(3)
        .subscribeOn(Schedulers.computation())
        .unsubscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread());
  }
}

相关文章

Observable类方法