io.reactivex.Observable.combineLatest()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(195)

本文整理了Java中io.reactivex.Observable.combineLatest()方法的一些代码示例,展示了Observable.combineLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.combineLatest()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:combineLatest

Observable.combineLatest介绍

[英]Combines nine source ObservableSources by emitting an item that aggregates the latest values of each of the source ObservableSources each time an item is received from any of the source ObservableSources, where this aggregation is defined by a specified function.

If any of the sources never produces an item but only terminates (normally or with an error), the resulting sequence terminates immediately (normally or with all the errors accumulated till that point). If that input source is also synchronous, other sources after it will not be subscribed to.

Scheduler: combineLatest does not operate by default on a particular Scheduler.
[中]通过发出一个项组合九个源ObservableSource,每次从任何源ObservableSource接收到一个项时,该项聚合每个源ObservableSource的最新值,其中该聚合由指定函数定义。
如果任何一个源从未生成一个项,但只终止(正常或有错误),则生成的序列立即终止(正常或所有错误累积到该点)。如果该输入源也是同步的,则不会订阅它之后的其他源。
调度器:默认情况下,CombineTest不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void combineLatestVarargsNull() {
  Observable.combineLatest(new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128, (Observable<Object>[])null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void combineLatestIterableNull() {
  Observable.combineLatest((Iterable<Observable<Object>>)null, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestIterableFunctionNull() {
  Observable.combineLatest(Arrays.asList(just1), null, 128);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * This won't compile if super/extends isn't done correctly on generics.
 */
@Test
public void testCovarianceOfCombineLatest() {
  Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  Observable<CoolRating> ratings = Observable.just(new CoolRating());
  Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(extendedAction);
  Observable.<Media, Rating, Result> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> combineLatest(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> combineLatest(horrors, ratings, combine);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestIterableFunctionReturnsNull() {
  Observable.combineLatest(Arrays.asList(just1), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return null;
    }
  }, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCombineLatest3TypesB() {
  Function3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction();
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> w = Observable.combineLatest(Observable.just("one"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction);
  w.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  verify(observer, times(1)).onNext("one2[4, 5, 6]");
  verify(observer, times(1)).onNext("one2[7, 8]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCombineLatest3TypesA() {
  Function3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction();
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> w = Observable.combineLatest(Observable.just("one", "two"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }), combineLatestFunction);
  w.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  verify(observer, times(1)).onNext("two2[4, 5, 6]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCombineLatest2Types() {
  BiFunction<String, Integer, String> combineLatestFunction = getConcatStringIntegerCombineLatestFunction();
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> w = Observable.combineLatest(Observable.just("one", "two"), Observable.just(2, 3, 4), combineLatestFunction);
  w.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  verify(observer, times(1)).onNext("two2");
  verify(observer, times(1)).onNext("two3");
  verify(observer, times(1)).onNext("two4");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testZeroSources() {
  Observable<Object> result = Observable.combineLatest(
      Collections.<Observable<Object>> emptyList(), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] args) {
      return args;
    }
  });
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onComplete();
  verify(o, never()).onNext(any());
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestVarargsFunctionNull() {
  Observable.combineLatest(null, 128, Observable.never());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestVarargsFunctionReturnsNull() {
  Observable.combineLatest(new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return null;
    }
  }, 128, just1).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void combineLatestArrayOfSources() {
  Observable.combineLatest(new ObservableSource[] {
      Observable.just(1), Observable.just(2)
  }, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return Arrays.toString(a);
    }
  })
  .test()
  .assertResult("[1, 2]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void combineLatestIterableIteratorNull() {
  Observable.combineLatest(new Iterable<Observable<Object>>() {
    @Override
    public Iterator<Observable<Object>> iterator() {
      return null;
    }
  }, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestIterableOneIsNull() {
  Observable.combineLatest(Arrays.asList(Observable.never(), null), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFirstNeverProduces() {
  PublishSubject<Integer> a = PublishSubject.create();
  PublishSubject<Integer> b = PublishSubject.create();
  Observable<Integer> source = Observable.combineLatest(a, b, or);
  Observer<Object> observer = TestHelper.mockObserver();
  InOrder inOrder = inOrder(observer);
  source.subscribe(observer);
  b.onNext(0x10);
  b.onNext(0x20);
  a.onComplete();
  inOrder.verify(observer, times(1)).onComplete();
  verify(observer, never()).onNext(any());
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestVarargsOneIsNull() {
  Observable.combineLatest(new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128, Observable.never(), null).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSecondNeverProduces() {
  PublishSubject<Integer> a = PublishSubject.create();
  PublishSubject<Integer> b = PublishSubject.create();
  Observable<Integer> source = Observable.combineLatest(a, b, or);
  Observer<Object> observer = TestHelper.mockObserver();
  InOrder inOrder = inOrder(observer);
  source.subscribe(observer);
  a.onNext(0x1);
  a.onNext(0x2);
  b.onComplete();
  a.onComplete();
  inOrder.verify(observer, times(1)).onComplete();
  verify(observer, never()).onNext(any());
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void combineLatestEmpty() {
  assertSame(Observable.empty(), Observable.combineLatest(new ObservableSource[0], Functions.<Object[]>identity(), 16));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposed() {
  TestHelper.checkDisposed(Observable.combineLatest(Observable.never(), Observable.never(), new BiFunction<Object, Object, Object>() {
    @Override
    public Object apply(Object a, Object b) throws Exception {
      return a;
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Observable.combineLatest(Observable.never(), Observable.error(new TestException()), new BiFunction<Object, Object, Object>() {
    @Override
    public Object apply(Object a, Object b) throws Exception {
      return a;
    }
  })
  .test()
  .assertFailure(TestException.class);
}

相关文章

Observable类方法