io.reactivex.Observable.doAfterTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(104)

本文整理了Java中io.reactivex.Observable.doAfterTerminate()方法的一些代码示例,展示了Observable.doAfterTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doAfterTerminate()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doAfterTerminate

Observable.doAfterTerminate介绍

[英]Registers an Action to be called when this ObservableSource invokes either Observer#onComplete or Observer#onError.

Scheduler: doAfterTerminate does not operate by default on a particular Scheduler.
[中]注册当此ObservateSource调用Observator#onComplete或Observator#onError时要调用的操作。
Scheduler:doAfterTerminate默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doAfterTerminateNull() {
  just1.doAfterTerminate(null);
}

代码示例来源:origin: ReactiveX/RxJava

private void checkActionCalled(Observable<String> input) {
  input.doAfterTerminate(aAction0).subscribe(observer);
  try {
    verify(aAction0, times(1)).run();
  } catch (Throwable e) {
    throw ExceptionHelper.wrapOrThrow(e);
  }
}

代码示例来源:origin: ReactiveX/RxJava

}).doAfterTerminate(new Action() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfter() {
  final int[] call = { 0 };
  Observable.just(1)
  .doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
      call[0]++;
    }
  })
  .test()
  .assertResult(1);
  assertEquals(1, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: JessYanCoding/RetrofitUrlManager

@Override
  public ObservableSource<T> apply(Observable<T> upstream) {
    return upstream.subscribeOn(Schedulers.io())
        .doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
            mProgressDialog.show();
          }
        })
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .doAfterTerminate(new Action() {
          @Override
          public void run() throws Exception {
            mProgressDialog.dismiss();
          }
        });
  }
};

代码示例来源:origin: JessYanCoding/WideEyes

@Override
  public Observable<T> apply(Observable<T> observable) {
    return observable.subscribeOn(Schedulers.io())
        .doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(@NonNull Disposable disposable) throws Exception {
            view.showLoading();//显示进度条
          }
        })
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .doAfterTerminate(new Action() {
          @Override
          public void run() throws Exception {
            view.hideLoading();//隐藏进度条
          }
        });
  }
};

代码示例来源:origin: laizimo/richeditor

.doAfterTerminate(new Action() {
  @Override
  public void run() throws Exception {

代码示例来源:origin: 0xm1nam0/RxCore

compositeDisposable.add(model.getPage(page)
    .compose(BaseRxActivity().handleResult())
    .doAfterTerminate(() -> {
      if (page == PAGEBEGIN) {
        ptrFrame.refreshComplete();

相关文章

Observable类方法