io.reactivex.Observable.toFlowable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(253)

本文整理了Java中io.reactivex.Observable.toFlowable()方法的一些代码示例,展示了Observable.toFlowable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.toFlowable()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:toFlowable

Observable.toFlowable介绍

[英]Converts the current Observable into a Flowable by applying the specified backpressure strategy.

Marble diagrams for the various backpressure strategies are as follows:

  • BackpressureStrategy#BUFFER
  • BackpressureStrategy#DROP
  • BackpressureStrategy#LATEST
  • BackpressureStrategy#ERROR
  • BackpressureStrategy#MISSING
    Backpressure: The operator applies the chosen backpressure strategy of BackpressureStrategy enum. Scheduler: toFlowable does not operate by default on a particular Scheduler.
    [中]通过应用指定的背压策略,将当前的可观测值转换为可流动值。
    各种背压策略的大理石图如下:
    *背压平衡#缓冲器
    *背压平衡#下降
    *背压策略#最新
    *背压平衡误差
    *背压平衡#缺失
    背压:操作员应用选择的背压策略BackpressureStragy enum。Scheduler:toFlowable默认情况下不会在特定的计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Observable<Integer> v) {
    return v.toFlowable(BackpressureStrategy.MISSING);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
public Flowable<Integer> apply(Observable<Integer> upstream) {
  return upstream.toFlowable(BackpressureStrategy.MISSING);
}

代码示例来源:origin: trello/RxLifecycle

@Override
public Publisher<T> apply(Flowable<T> upstream) {
  return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
}

代码示例来源:origin: kaushikgopal/RxJava-Android-Samples

@Override
public View onCreateView(
  LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
 View layout = inflater.inflate(R.layout.fragment_form_validation_comb_latest, container, false);
 unbinder = ButterKnife.bind(this, layout);
 _emailChangeObservable =
   RxTextView.textChanges(_email).skip(1).toFlowable(BackpressureStrategy.LATEST);
 _passwordChangeObservable =
   RxTextView.textChanges(_password).skip(1).toFlowable(BackpressureStrategy.LATEST);
 _numberChangeObservable =
   RxTextView.textChanges(_number).skip(1).toFlowable(BackpressureStrategy.LATEST);
 _combineLatestEvents();
 return layout;
}

代码示例来源:origin: jeasonlzy/okhttp-OkGo

@Override
  public Flowable<Response<T>> adapt(Call<T> call, AdapterParam param) {
    ObservableResponse<T> observable = new ObservableResponse<>();
    return observable.adapt(call, param).toFlowable(BackpressureStrategy.LATEST);
  }
}

代码示例来源:origin: jeasonlzy/okhttp-OkGo

@Override
  public Flowable<Result<T>> adapt(Call<T> call, AdapterParam param) {
    ObservableResult<T> observable = new ObservableResult<>();
    return observable.adapt(call, param).toFlowable(BackpressureStrategy.LATEST);
  }
}

代码示例来源:origin: jeasonlzy/okhttp-OkGo

@Override
  public Flowable<T> adapt(Call<T> call, AdapterParam param) {
    ObservableBody<T> observable = new ObservableBody<>();
    return observable.adapt(call, param).toFlowable(BackpressureStrategy.LATEST);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).toFlowable(BackpressureStrategy.MISSING));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).toFlowable(BackpressureStrategy.ERROR));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(Observable.just(1).toFlowable(BackpressureStrategy.ERROR));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void toFlowableError2() {
  Observable.range(1, 5)
  .toFlowable(BackpressureStrategy.ERROR)
  .test(5)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: square/retrofit

return observable.toFlowable(BackpressureStrategy.LATEST);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void toFlowableError1() {
  Observable.range(1, 5)
  .toFlowable(BackpressureStrategy.ERROR)
  .test(1)
  .assertFailure(MissingBackpressureException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void toFlowableDrop() {
  Observable.range(1, 5)
  .toFlowable(BackpressureStrategy.DROP)
  .test(1)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void toFlowableLatest() {
  TestSubscriber<Integer> ts = Observable.range(1, 5)
  .toFlowable(BackpressureStrategy.LATEST)
  .test(0);
  ts.request(1);
  ts
  .assertResult(5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void error() {
    Observable.error(new TestException())
    .toFlowable(BackpressureStrategy.MISSING)
    .test()
    .assertFailure(TestException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void toFlowableMissing() {
    TestSubscriber<Integer> ts = Observable.range(1, 5)
        .toFlowable(BackpressureStrategy.MISSING)
        .test(0);

    ts.request(2);
    ts
    .assertResult(1, 2, 3, 4, 5);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void toFlowableBuffer() {
  Observable.range(1, 5)
  .toFlowable(BackpressureStrategy.BUFFER)
  .test(2L)
  .assertValues(1, 2)
  .assertNoErrors()
  .assertNotComplete();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void emptyBodyWithRxJava2Observable() throws Exception {
  ResolvableType type = httpEntityType(io.reactivex.Observable.class, String.class);
  HttpEntity<io.reactivex.Observable<String>> entity = resolveValueWithEmptyBody(type);
  StepVerifier.create(entity.getBody().toFlowable(BackpressureStrategy.BUFFER))
      .expectNextCount(0)
      .expectComplete()
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

void registerAdapters(ReactiveAdapterRegistry registry) {
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
        source -> (io.reactivex.Flowable<?>) source,
        Flowable::fromPublisher
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
        source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER),
        source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
        source -> ((io.reactivex.Single<?>) source).toFlowable(),
        source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
        source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
        source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
    );
    registry.registerReactiveType(
        ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
        source -> ((io.reactivex.Completable) source).toFlowable(),
        source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
    );
  }
}

相关文章

Observable类方法