io.reactivex.Observable.zip()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(207)

本文整理了Java中io.reactivex.Observable.zip()方法的一些代码示例,展示了Observable.zip()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zip()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:zip

Observable.zip介绍

[英]Returns an Observable that emits the results of a specified combiner function applied to combinations of nine items emitted, in sequence, by nine other ObservableSources.

zip applies this function in strict sequence, so the first item emitted by the new ObservableSource will be the result of the function applied to the first item emitted by each source ObservableSource, the second item emitted by the new ObservableSource will be the result of the function applied to the second item emitted by each of those ObservableSources, and so forth.

The resulting ObservableSource returned from zip will invoke Observer#onNextas many times as the number of onNext invocations of the source ObservableSource that emits the fewest items.

The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while disposing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnComplete()). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will dispose B immediately. For example:

zip(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2), ..., (a, b, c, d, e, f, g, h, i) -> a + b)

action1 will be called but action2 won't.
To work around this termination property, use #doOnDispose(Action) as well or use using() to do cleanup in case of completion or a dispose() call. Scheduler: zip does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出指定组合器函数的结果,该函数应用于其他九个ObservableSource按顺序发出的九个项的组合。
zip以严格的顺序应用此函数,因此新ObservableSource发出的第一项将是应用于每个源ObservableSource发出的第一项的函数的结果,新的ObservableSource发出的第二项将是应用于每个ObservableSource发出的第二项的函数的结果,依此类推。
从zip返回的结果ObservableSource将调用Observator#onNext,调用次数是发出最少项的源ObservableSource的onNext调用次数的两倍。
操作员按指定的顺序订阅其源,如果其中一个源比其他源短,则在处理其他源时急切地完成。因此,这些其他源可能永远无法运行到完成(因此不调用doOnComplete()。如果源的长度完全相同,也可能发生这种情况;如果源A完成且B已被消耗且即将完成,则操作员检测到A不会发送更多值,并将立即处理B。例如:

zip(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2), ..., (a, b, c, d, e, f, g, h, i) -> a + b)

将调用action1,但不会调用action2。
要解决此终止属性,请同时使用#doOnDispose(Action)或使用using()在完成或dispose()调用时进行清理。调度器:默认情况下,zip不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipObservableNull() {
  Observable.zip((Observable<Observable<Object>>)null, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipIterableNull() {
  Observable.zip((Iterable<Observable<Object>>)null, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void zipIterableFunctionNull() {
  Observable.zip(Arrays.asList(just1, just1), null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipObservableFunctionNull() {
  Observable.zip((Observable.just(just1)), null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * This won't compile if super/extends isn't done correctly on generics.
 */
@Test
public void testCovarianceOfZip() {
  Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
  Observable<CoolRating> ratings = Observable.just(new CoolRating());
  Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
  Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
  Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.zip(Observable.just(1), Observable.just(1), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void zipIterableFunctionReturnsNull() {
  Observable.zip(Arrays.asList(just1, just1), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) {
      return null;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipObservableFunctionReturnsNull() {
  Observable.zip((Observable.just(just1)), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) {
      return null;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnNextExceptionInvokesOnError() {
  BiFunction<Integer, Integer, Integer> zipr = getDivideZipr();
  Observer<Integer> observer = TestHelper.mockObserver();
  Observable<Integer> w = Observable.zip(Observable.just(10, 20, 30), Observable.just(0, 1, 2), zipr);
  w.subscribe(observer);
  verify(observer, times(1)).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStart3Types() {
  Function3<String, Integer, int[], String> zipr = getConcatStringIntegerIntArrayZipr();
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> w = Observable.zip(Observable.just("one", "two"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }), zipr);
  w.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  verify(observer, times(1)).onNext("one2[4, 5, 6]");
  verify(observer, never()).onNext("two");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zip5() {
  Observable.zip(Observable.just(1),
      Observable.just(2), Observable.just(3),
      Observable.just(4), Observable.just(5),
    new Function5<Integer, Integer, Integer, Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b, Integer c, Integer d, Integer e) throws Exception {
        return "" + a + b + c + d + e;
      }
    }
  )
  .test()
  .assertResult("12345");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zip4() {
  Observable.zip(Observable.just(1),
      Observable.just(2), Observable.just(3),
      Observable.just(4),
    new Function4<Integer, Integer, Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b, Integer c, Integer d) throws Exception {
        return "" + a + b + c + d;
      }
    }
  )
  .test()
  .assertResult("1234");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStart2Types() {
  BiFunction<String, Integer, String> zipr = getConcatStringIntegerZipr();
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> w = Observable.zip(Observable.just("one", "two"), Observable.just(2, 3, 4), zipr);
  w.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  verify(observer, times(1)).onNext("one2");
  verify(observer, times(1)).onNext("two3");
  verify(observer, never()).onNext("4");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zip3() {
  Observable.zip(Observable.just(1),
      Observable.just(2), Observable.just(3),
    new Function3<Integer, Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b, Integer c) throws Exception {
        return "" + a + b + c;
      }
    }
  )
  .test()
  .assertResult("123");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zipArrayMany() {
  @SuppressWarnings("unchecked")
  Observable<Integer>[] arr = new Observable[10];
  Arrays.fill(arr, Observable.just(1));
  Observable.zip(Arrays.asList(arr), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return Arrays.toString(a);
    }
  })
  .test()
  .assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipIterableIteratorNull() {
  Observable.zip(new Iterable<Observable<Object>>() {
    @Override
    public Iterator<Observable<Object>> iterator() {
      return null;
    }
  }, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zip2() {
  Observable.zip(Observable.just(1),
      Observable.just(2),
    new BiFunction<Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b) throws Exception {
        return "" + a + b;
      }
    }
  )
  .test()
  .assertResult("12");
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testCollectionSizeDifferentThanFunction() {
  Function<Object[], String> zipr = Functions.toFunction(getConcatStringIntegerIntArrayZipr());
  //Function3<String, Integer, int[], String>
  /* define an Observer to receive aggregated events */
  Observer<String> observer = TestHelper.mockObserver();
  @SuppressWarnings("rawtypes")
  Collection ws = java.util.Collections.singleton(Observable.just("one", "two"));
  Observable<String> w = Observable.zip(ws, zipr);
  w.subscribe(observer);
  verify(observer, times(1)).onError(any(Throwable.class));
  verify(observer, never()).onComplete();
  verify(observer, never()).onNext(any(String.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Before
public void setUp() {
  concat2Strings = new BiFunction<String, String, String>() {
    @Override
    public String apply(String t1, String t2) {
      return t1 + "-" + t2;
    }
  };
  s1 = PublishSubject.create();
  s2 = PublishSubject.create();
  zipped = Observable.zip(s1, s2, concat2Strings);
  observer = TestHelper.mockObserver();
  inOrder = inOrder(observer);
  zipped.subscribe(observer);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zip2DelayError() {
  Observable.zip(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())),
      Observable.just(2),
    new BiFunction<Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b) throws Exception {
        return "" + a + b;
      }
    }, true
  )
  .test()
  .assertFailure(TestException.class, "12");
}

相关文章

Observable类方法