reactor.core.publisher.Flux.zip()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(551)

本文整理了Java中reactor.core.publisher.Flux.zip()方法的一些代码示例,展示了Flux.zip()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.zip()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:zip

Flux.zip介绍

[英]Zip multiple sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios. The Iterable#iterator() will be called on each Publisher#subscribe(Subscriber).
[中]将多个源压缩在一起,也就是说,等待所有源发出一个元素,并将这些元素组合成一个输出值(由提供的组合器构造)。操作员将继续执行此操作,直到任何源完成。错误将立即转发。这种“分步合并”处理在分散-聚集场景中特别有用。将对每个发布者#订阅者(订阅者)调用Iterable#迭代器()。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void createZipWithPrefetchIterable() {
  List<Flux<Integer>> list = Arrays.asList(Flux.just(1), Flux.just(2));
  Flux<Integer> f = Flux.zip(list, 123, obj -> 0);
  assertThat(f.getPrefetch()).isEqualTo(123);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteIterableEmpty() {
  StepVerifier.create(Flux.zip(Arrays.asList(), obj -> 0))
        .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-framework

@Parameters(name = "client[{0}] - server [{1}]")
public static Object[][] arguments() throws IOException {
  WebSocketClient[] clients = new WebSocketClient[] {
      new TomcatWebSocketClient(),
      new JettyWebSocketClient(),
      new ReactorNettyWebSocketClient(),
      new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))
  };
  Map<HttpServer, Class<?>> servers = new LinkedHashMap<>();
  servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class);
  servers.put(new JettyHttpServer(), JettyConfig.class);
  servers.put(new ReactorHttpServer(), ReactorNettyConfig.class);
  servers.put(new UndertowHttpServer(), UndertowConfig.class);
  Flux<WebSocketClient> f1 = Flux.fromArray(clients).concatMap(c -> Flux.just(c).repeat(servers.size()));
  Flux<HttpServer> f2 = Flux.fromIterable(servers.keySet()).repeat(clients.length);
  Flux<Class<?>> f3 = Flux.fromIterable(servers.values()).repeat(clients.length);
  return Flux.zip(f1, f2, f3).map(Tuple3::toArray).collectList().block()
      .toArray(new Object[clients.length * servers.size()][2]);
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void createZipWithPrefetch() {
  Flux<Integer>[] list = new Flux[]{Flux.just(1), Flux.just(2)};
  Flux<Integer> f = Flux.zip(obj -> 0, 123, list);
  assertThat(f.getPrefetch()).isEqualTo(123);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteEmptySource() {
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Mono.empty()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteSourceEmptyDouble() {
  DirectProcessor<Integer> d = DirectProcessor.create();
  StepVerifier.create(Flux.zip(obj -> 0, d, s -> {
    CoreSubscriber<?> a =
        ((DirectProcessor.DirectInner) d.inners().findFirst().get())
            .actual;
    Operators.complete(s);
    a.onComplete();
  }, Mono.just(1)))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteSourceEmpty() {
  StepVerifier.create(Flux.zip(obj -> 0,
      Flux.just(1),
      Mono.empty()
        .hide()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteIterableCallableNull() {
  StepVerifier.create(Flux.zip(Arrays.asList(Flux.just(1),
      Mono.fromCallable(() -> null)), obj -> 0))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteEmptySourceHide() {
  StepVerifier.create(Flux.zip(obj -> 0,
      Flux.just(1)
        .hide(),
      Mono.empty()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNull() {
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), null))
        .verifyError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
  Hooks.onErrorDropped(e -> {
  });
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.never(), s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onError(new Exception("test"));
    s.onError(new Exception("test2"));
  }))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteCallableNull() {
  StepVerifier.create(Flux.zip(obj -> 0,
      Flux.just(1),
      Mono.fromCallable(() -> null)))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failCombinedNull() {
  StepVerifier.create(Flux.zip(obj -> null, Flux.just(1), Flux.just(2)))
        .verifyError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void zip() {
  StepVerifier.create(Flux.zip(obj -> (int) obj[0], Flux.just(1)))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void ignoreDoubleComplete() {
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.never(), s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onComplete();
  }))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void ignoreRequestZeroHideAll() {
  StepVerifier.create(Flux.zip(obj -> (int) obj[0] + (int) obj[1],
      Flux.just(1)
        .hide(),
      Flux.just(2)
        .hide()), 0)
        .consumeSubscriptionWith(s -> s.request(0))
        .thenRequest(1)
        .expectNext(3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCompleteCallableNullHideAll() {
  StepVerifier.create(Flux.zip(obj -> 0,
      Flux.just(1)
        .hide(),
      Mono.fromCallable(() -> null)
        .hide()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void zip4() {
  StepVerifier.create(Flux.zip(Flux.just(1),
      Flux.just(2),
      Flux.just(3),
      Flux.just(4)))
        .expectNext(Tuples.of(1, 2, 3, 4))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void zip5() {
  StepVerifier.create(Flux.zip(Flux.just(1),
      Flux.just(2),
      Flux.just(3),
      Flux.just(4),
      Flux.just(5)))
        .expectNext(Tuples.of(1, 2, 3, 4, 5))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subsequentSumHidden() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  range(1, 5).hide()
        .publish(o -> zip((Object[] a) -> (Integer) a[0] + (Integer) a[1], o, o
            .skip(1)))
        .subscribe(ts);
  ts.assertValues(1 + 2, 2 + 3, 3 + 4, 4 + 5)
   .assertNoError()
   .assertComplete();
}

相关文章

Flux类方法