io.reactivex.Observable.compose()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(329)

本文整理了Java中io.reactivex.Observable.compose()方法的一些代码示例,展示了Observable.compose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.compose()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:compose

Observable.compose介绍

[英]Transform an ObservableSource by applying a particular Transformer function to it.

This method operates on the ObservableSource itself whereas #lift operates on the ObservableSource's Observers.

If the operator you are creating is designed to act on the individual items emitted by a source ObservableSource, use #lift. If your operator is designed to transform the source ObservableSource as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose. Scheduler: compose does not operate by default on a particular Scheduler.
[中]通过对一个可观察资源应用特定的变换函数来变换它。
该方法对可观察资源本身进行操作,而#lift对可观察资源的观察者进行操作。
如果您正在创建的操作符旨在作用于源可观测源发出的单个项目,请使用#lift。如果操作符被设计为将源ObservableSource作为一个整体进行转换(例如,通过对其应用一组特定的现有RxJava操作符),请使用compose。调度器:默认情况下,compose不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void composeNull() {
  just1.compose(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void observableGenericsSignatureTest() {
  A<String, Integer> a = new A<String, Integer>() { };
  Observable.just(a).compose(TransformerTest.<String>testObservableTransformerCreator());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose() {
  Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
  Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
    @Override
    public Observable<Movie> apply(Observable<HorrorMovie> t) {
      return Observable.just(new Movie());
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose4() {
  Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
  Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, HorrorMovie>() {
    @Override
    public Observable<HorrorMovie> apply(Observable<HorrorMovie> t1) {
      return t1.map(new Function<HorrorMovie, HorrorMovie>() {
        @Override
        public HorrorMovie apply(HorrorMovie v) {
          return v;
        }
      });
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose2() {
  Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
  Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
    @Override
    public Observable<HorrorMovie> apply(Observable<Movie> t) {
      return Observable.just(new HorrorMovie());
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void testCovarianceOfCompose3() {
  Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
  Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
    @Override
    public Observable<HorrorMovie> apply(Observable<Movie> t) {
      return Observable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
        @Override
        public HorrorMovie apply(HorrorMovie v) {
          return v;
        }
      });
    }
  }
  );
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void observableTransformerThrows() {
  try {
    Observable.just(1).compose(new ObservableTransformer<Integer, Integer>() {
      @Override
      public Observable<Integer> apply(Observable<Integer> v) {
        throw new TestException("Forced failure");
      }
    });
    fail("Should have thrown!");
  } catch (TestException ex) {
    assertEquals("Forced failure", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testComposeWithDeltaLogic() {
  List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
  List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
  Observable<List<Movie>> movies = Observable.just(list1, list2);
  movies.compose(deltaTransformer);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCompose() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.just(1, 2, 3).compose(new ObservableTransformer<Integer, String>() {
    @Override
    public Observable<String> apply(Observable<Integer> t1) {
      return t1.map(new Function<Integer, String>() {
        @Override
        public String apply(Integer v) {
          return String.valueOf(v);
        }
      });
    }
  })
  .subscribe(to);
  to.assertTerminated();
  to.assertNoErrors();
  to.assertValues("1", "2", "3");
}

代码示例来源:origin: Polidea/RxAndroidBle

private static Observable<PresenterEvent> setupReadingBehaviour(Observable<Boolean> readClicks,
                                BluetoothGattCharacteristic characteristic,
                                RxBleConnection connection) {
  return !hasProperty(characteristic, BluetoothGattCharacteristic.PROPERTY_READ)
      // if the characteristic is not readable return an empty (dummy) observable
      ? Observable.empty()
      : readClicks // else use the readClicks observable from the activity
      // every click is requesting a read operation from the peripheral
      .flatMapSingle(ignoredClick -> connection.readCharacteristic(characteristic))
      .compose(transformToPresenterEvent(Type.READ)); // convenience method to wrap reads
}

代码示例来源:origin: trello/RxLifecycle

@Test
public void testBindLifecycle() {
  BehaviorSubject<Object> lifecycle = BehaviorSubject.create();
  TestObserver<Object> testObserver = observable.compose(RxLifecycle.bind(lifecycle)).test();
  testObserver.assertNotComplete();
  lifecycle.onNext(new Object());
  testObserver.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void asyncFusedPollCrash() {
  PublishSubject<Integer> ps = PublishSubject.create();
  TestObserver<Integer> to = ps
  .switchMap(Functions.justFunction(
      Observable.range(1, 5)
      .observeOn(ImmediateThinScheduler.INSTANCE)
      .map(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer v) throws Exception {
          throw new TestException();
        }
      })
      .compose(TestHelper.<Integer>observableStripBoundary())
  ))
  .test();
  to.assertEmpty();
  ps.onNext(1);
  to
  .assertFailure(TestException.class);
  assertFalse(ps.hasObservers());
}

代码示例来源:origin: trello/RxLifecycle

@Test
public void testEndsImmediatelyOutsideActivityLifecycle() {
  BehaviorSubject<ActivityEvent> lifecycle = BehaviorSubject.create();
  lifecycle.onNext(ActivityEvent.DESTROY);
  TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroid.bindActivity(lifecycle)).test();
  testObserver.assertComplete();
}

代码示例来源:origin: trello/RxLifecycle

@Test
public void testBindLifecycleOtherObject() {
  // Ensures it works with other types as well, and not just "Object"
  BehaviorSubject<String> lifecycle = BehaviorSubject.create();
  TestObserver<Object> testObserver = observable.compose(RxLifecycle.bind(lifecycle)).test();
  testObserver.assertNotComplete();
  lifecycle.onNext("");
  testObserver.assertComplete();
}

代码示例来源:origin: trello/RxLifecycle

@Test
public void testEndsImmediatelyOutsideLifecycle() {
  BehaviorSubject<Lifecycle.Event> lifecycle = BehaviorSubject.create();
  lifecycle.onNext(Lifecycle.Event.ON_DESTROY);
  TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroidLifecycle.bindLifecycle(lifecycle)).test();
  testObserver.assertComplete();
}

代码示例来源:origin: trello/RxLifecycle

@Test
public void testEndsImmediatelyOutsideFragmentLifecycle() {
  BehaviorSubject<FragmentEvent> lifecycle = BehaviorSubject.create();
  lifecycle.onNext(FragmentEvent.DETACH);
  TestObserver<Object> testObserver = observable.compose(RxLifecycleAndroid.bindFragment(lifecycle)).test();
  testObserver.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void asyncFusedPollCrashDelayError() {
  PublishSubject<Integer> ps = PublishSubject.create();
  TestObserver<Integer> to = ps
  .switchMapDelayError(Functions.justFunction(
      Observable.range(1, 5)
      .observeOn(ImmediateThinScheduler.INSTANCE)
      .map(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer v) throws Exception {
          throw new TestException();
        }
      })
      .compose(TestHelper.<Integer>observableStripBoundary())
  ))
  .test();
  to.assertEmpty();
  ps.onNext(1);
  assertTrue(ps.hasObservers());
  to.assertEmpty();
  ps.onComplete();
  to
  .assertFailure(TestException.class);
  assertFalse(ps.hasObservers());
}

代码示例来源:origin: trello/RxLifecycle

private void testBindUntilEvent(LifecycleOwner owner) {
  Fragment fragment = (Fragment) owner;
  ActivityController<?> controller = startFragment(fragment);
  TestObserver<Object> testObserver = observable.compose(AndroidLifecycle.createLifecycleProvider(owner).bindUntilEvent(Lifecycle.Event.ON_STOP)).test();
  testObserver.assertNotComplete();
  controller.start();
  testObserver.assertNotComplete();
  controller.resume();
  testObserver.assertNotComplete();
  controller.pause();
  testObserver.assertNotComplete();
  controller.stop();
  testObserver.assertComplete();
}

代码示例来源:origin: trello/RxLifecycle

private void testBindUntilEvent(ActivityController<? extends LifecycleProvider<ActivityEvent>> controller) {
  LifecycleProvider<ActivityEvent> activity = controller.get();
  TestObserver<Object> testObserver = observable.compose(activity.bindUntilEvent(STOP)).test();
  controller.create();
  testObserver.assertNotComplete();
  controller.start();
  testObserver.assertNotComplete();
  controller.resume();
  testObserver.assertNotComplete();
  controller.pause();
  testObserver.assertNotComplete();
  controller.stop();
  testObserver.assertComplete();
}

代码示例来源:origin: trello/RxLifecycle

private void testBindUntilEvent(ActivityController<? extends LifecycleOwner> controller) {
  LifecycleProvider<Lifecycle.Event> activity = AndroidLifecycle.createLifecycleProvider(controller.get());
  TestObserver<Object> testObserver = observable.compose(activity.bindUntilEvent(Lifecycle.Event.ON_STOP)).test();
  controller.create();
  testObserver.assertNotComplete();
  controller.start();
  testObserver.assertNotComplete();
  controller.resume();
  testObserver.assertNotComplete();
  controller.pause();
  testObserver.assertNotComplete();
  controller.stop();
  testObserver.assertComplete();
}

相关文章

Observable类方法