reactor.core.publisher.Flux.zipWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(386)

本文整理了Java中reactor.core.publisher.Flux.zipWith()方法的一些代码示例,展示了Flux.zipWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.zipWith()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:zipWith

Flux.zipWith介绍

[英]Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
[中]用另一个发布源压缩这个流量,也就是说,等待两者都发出一个元素,并将这些元素组合成一个Tuple2。操作员将继续执行此操作,直到任何源完成。错误将立即转发。这种“分步合并”处理在分散-聚集场景中特别有用。

代码示例

代码示例来源:origin: reactor/reactor-core

Flux<Integer> exponentialRepeatScenario1() {
  AtomicInteger i = new AtomicInteger();
  return Mono.fromCallable(i::incrementAndGet)
        .repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                      .flatMap(time -> Mono.delay(Duration.ofSeconds(
                          time))));
}

代码示例来源:origin: reactor/reactor-core

Mono<String> exponentialRetryScenario() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.success("hey");
    }
    else {
      s.error(new RuntimeException("test " + i));
    }
  }).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: reactor/reactor-core

Flux<String> exponentialRetryScenario() {
  AtomicInteger i = new AtomicInteger();
  return Flux.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.next("hey");
    }
    else {
      s.error(new RuntimeException("test " + i));
    }
  }).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_errorFromUpstreamFailure() {
  return Arrays.asList(
      scenario(f -> f.zipWith(Flux.just(1, 2, 3), 3, (a, b) -> a))
        .prefetch(3),
      scenario(f -> f.zipWith(Flux.<String>error(exception()),
          (a, b) -> a)).shouldHitDropErrorHookAfterTerminate(false),
      scenario(f -> f.zipWith(Flux.<String>error(exception()).hide(),
          (a, b) -> a)));
}

代码示例来源:origin: reactor/reactor-core

Flux<String> exponentialRepeatScenario2() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.success("hey");
    }
    else {
      s.success();
    }
  }).repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusionMapToNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    .zipWith(Flux.fromIterable(Arrays.asList(1, 2))
           .map(v -> v == 2 ? null : v), (a, b) -> a + b)
    .subscribe(ts);
  ts.assertValues(2)
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

/**
 * See https://github.com/reactor/reactor-core/issues/453
 */
@Test
public void testDrainSyncCompletesSeveralBatches() {
  //both hide and just with 2 elements are necessary to go into SYNC mode
  StepVerifier.create(Flux.just(1, 2)
              .flatMapIterable(t -> IntStream.rangeClosed(0, 35).boxed().collect(Collectors.toList()))
              .hide()
              .zipWith(Flux.range(1000, 100))
              .count())
        .expectNext(72L)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sameLength() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source = Flux.fromIterable(Arrays.asList(1, 2));
  source.zipWith(source, (a, b) -> a + b)
     .subscribe(ts);
  ts.assertValues(2, 4)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void differentLengthOpt() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.fromIterable(Arrays.asList(1, 2));
  Flux<Integer> source2 = Flux.just(1, 2, 3);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertValues(2, 4)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nonEmptyAndEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.just(1, 2, 3);
  Flux<Integer> source2 = Flux.fromIterable(Collections.emptyList());
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyNonEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.fromIterable(Collections.emptyList());
  Flux<Integer> source2 = Flux.just(1, 2, 3);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void differentLength() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.fromIterable(Arrays.asList(1, 2));
  Flux<Integer> source2 = Flux.just(1, 2, 3);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertValues(2, 4)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

/**
 * See https://github.com/reactor/reactor-core/issues/453
 */
@Test
public void testDrainAsyncCompletesSeveralBatches() {
  StepVerifier.create(Flux.range(0, 72)
              .collectList()
              .flatMapIterable(Function.identity())
              .zipWith(Flux.range(1000, 100))
              .count())
        .expectNext(72L)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sameLengthOptimized() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source = Flux.just(1, 2);
  source.zipWith(source, (a, b) -> a + b)
     .subscribe(ts);
  ts.assertValues(2, 4)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorHandlingRetryWhenEquatesRetry() {
  Flux<String> flux =
  Flux.<String>error(new IllegalArgumentException())
      .retryWhen(companion -> companion
          .zipWith(Flux.range(1, 4), (error, index) -> { // <1>
            if (index < 4) return index; // <2>
            else throw Exceptions.propagate(error); // <3>
          })
      );
  StepVerifier.create(flux)
        .verifyError(IllegalArgumentException.class);
  StepVerifier.create(Flux.<String>error(new IllegalArgumentException()).retry(3))
        .verifyError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyScalar() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.empty();
  Flux<Integer> source2 = Flux.just(1);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scalarNonScalar() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.just(1);
  Flux<Integer> source2 = Flux.just(1, 2, 3);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scalarNonScalarOpt() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.just(1);
  Flux<Integer> source2 = Flux.just(1, 2, 3);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scalarScalar() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux<Integer> source1 = Flux.just(1);
  Flux<Integer> source2 = Flux.just(1);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scalarNonScalarBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux<Integer> source1 = Flux.just(1);
  Flux<Integer> source2 = Flux.just(1, 2, 3);
  source1.zipWith(source2, (a, b) -> a + b)
      .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(1);
  ts.assertValues(2)
   .assertNoError()
   .assertComplete();
}

相关文章

Flux类方法