io.reactivex.Observable.zipWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(184)

本文整理了Java中io.reactivex.Observable.zipWith()方法的一些代码示例,展示了Observable.zipWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zipWith()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:zipWith

Observable.zipWith介绍

[英]Returns an Observable that emits items that are the result of applying a specified function to pairs of values, one each from the source ObservableSource and another specified ObservableSource.

The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while disposing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnComplete()). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will dispose B immediately. For example:

range(1, 5).doOnComplete(action1).zipWith(range(6, 5).doOnComplete(action2), (a, b) -> a + b)

action1 will be called but action2 won't.
To work around this termination property, use #doOnDispose(Action) as well or use using() to do cleanup in case of completion or a dispose() call. Scheduler: zipWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,它发出的项是将指定函数应用于成对值的结果,每个值来自源ObservableSource和另一个指定ObservableSource。
操作员按指定的顺序订阅其源,如果其中一个源比其他源短,则在处理其他源时急切地完成。因此,这些其他源可能永远无法运行到完成(因此不调用doOnComplete()。如果源的长度完全相同,也可能发生这种情况;如果源A完成且B已被消耗且即将完成,则操作员检测到A不会发送更多值,并将立即处理B。例如:

range(1, 5).doOnComplete(action1).zipWith(range(6, 5).doOnComplete(action2), (a, b) -> a + b)

将调用action1,但不会调用action2。
要解决此终止属性,请同时使用#doOnDispose(Action)或使用using()在完成或dispose()调用时进行清理。调度器:zipWith默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<?> apply(Observable<? extends Throwable> attempt) {
    return attempt.zipWith(Observable.just(1), new BiFunction<Throwable, Integer, Void>() {
      @Override
      public Void apply(Throwable o, Integer integer) {
        return null;
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Integer> o) throws Exception {
    return o.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
      @Override
      public Object apply(Integer a, Integer b) throws Exception {
        return a + b;
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableNull() {
  just1.zipWith((Iterable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithObservableNull() {
  just1.zipWith((Observable<Integer>)null, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerNull() {
  just1.zipWith(Arrays.asList(1), null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithCombinerNull() {
  just1.zipWith(just1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.just(1).zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableOneIsNull() {
  Observable.just(1, 2).zipWith(Arrays.asList(1, null), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return 1;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableCombinerReturnsNull() {
  just1.zipWith(Arrays.asList(1), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake2() {
  Observable<Integer> o = Observable.just(1, 2, 3, 4, 5);
  Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
  SquareStr squareStr = new SquareStr();
  o.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
  assertEquals(2, squareStr.counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithCombinerReturnsNull() {
  just1.zipWith(just1, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zipWithDelayError() {
  Observable.just(1)
  .zipWith(Observable.just(2), new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  }, true)
  .test()
  .assertResult(3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zipWithDelayErrorBufferSize() {
  Observable.just(1)
  .zipWith(Observable.just(2), new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  }, true, 16)
  .test()
  .assertResult(3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void zipWithIterableIteratorNull() {
  just1.zipWith(new Iterable<Object>() {
    @Override
    public Iterator<Object> iterator() {
      return null;
    }
  }, new BiFunction<Integer, Object, Object>() {
    @Override
    public Object apply(Integer a, Object b) {
      return 1;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStart() {
  Observable<String> os = OBSERVABLE_OF_5_INTEGERS
      .zipWith(OBSERVABLE_OF_5_INTEGERS, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer a, Integer b) {
          return a + "-" + b;
        }
      });
  final ArrayList<String> list = new ArrayList<String>();
  os.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) {
      System.out.println(s);
      list.add(s);
    }
  });
  assertEquals(5, list.size());
  assertEquals("1-1", list.get(0));
  assertEquals("2-2", list.get(1));
  assertEquals("5-5", list.get(4));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void iteratorThrows() {
  Observable.just(1).zipWith(new CrashingIterable(100, 1, 100), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "hasNext()");
}

代码示例来源:origin: ReactiveX/RxJava

final CountDownLatch infiniteObservable = new CountDownLatch(1);
Observable<String> os = OBSERVABLE_OF_5_INTEGERS
    .zipWith(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservable), new BiFunction<Integer, Integer, String>() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStartAsync() throws InterruptedException {
  Observable<String> os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1))
      .zipWith(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)), new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer a, Integer b) {
          return a + "-" + b;
        }
      }).take(5);
  TestObserver<String> to = new TestObserver<String>();
  os.subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  assertEquals(5, to.valueCount());
  assertEquals("1-1", to.values().get(0));
  assertEquals("2-2", to.values().get(1));
  assertEquals("5-5", to.values().get(4));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() {
  Observable.range(1, 50)
      .zipWith(Observable.interval(5, TimeUnit.MILLISECONDS),
          new BiFunction<Integer, Long, Integer>() {
            @Override
            public Integer apply(Integer integer, Long aLong) {
              return integer;
            }
          })
      .buffer(Observable.interval(0, 100, TimeUnit.MILLISECONDS),
          new Function<Long, Observable<?>>() {
            @Override
            public Observable<?> apply(Long a) {
              return Observable.just(a).delay(200, TimeUnit.MILLISECONDS);
            }
          })
      .test()
      .assertSubscribed()
      .awaitDone(3, TimeUnit.SECONDS)
      .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void bufferedCanCompleteIfOpenNeverCompletesDropping() {
  Observable.range(1, 50)
      .zipWith(Observable.interval(5, TimeUnit.MILLISECONDS),
          new BiFunction<Integer, Long, Integer>() {
            @Override
            public Integer apply(Integer integer, Long aLong) {
              return integer;
            }
          })
      .buffer(Observable.interval(0, 200, TimeUnit.MILLISECONDS),
          new Function<Long, Observable<?>>() {
            @Override
            public Observable<?> apply(Long a) {
              return Observable.just(a).delay(100, TimeUnit.MILLISECONDS);
            }
          })
      .test()
      .assertSubscribed()
      .awaitDone(3, TimeUnit.SECONDS)
      .assertComplete();
}

相关文章

Observable类方法