reactor.core.publisher.Flux.mergeWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(356)

本文整理了Java中reactor.core.publisher.Flux.mergeWith()方法的一些代码示例,展示了Flux.mergeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.mergeWith()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:mergeWith

Flux.mergeWith介绍

[英]Merge data from this Flux and a Publisher into an interleaved merged sequence. Unlike #concatWith(Publisher), inner sources are subscribed to eagerly.

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
[中]将来自此流量和发布服务器的数据合并到交叉合并序列中。与#concatWith(Publisher)不同,内部资源的订阅非常踊跃。
请注意,merge是为使用异步源或有限源而定制的。当处理尚未在专用调度程序上发布的无限源时,必须在其自己的调度程序中隔离该源,因为merge会在订阅另一个源之前尝试将其耗尽。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/instances/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<InstanceEvent>> eventStream() {
  return Flux.from(eventStore).map(event -> ServerSentEvent.builder(event).build()).mergeWith(ping());
}

代码示例来源:origin: spring-projects/spring-data-redis

/**
 * @return the {@link Mono} signalling container termination.
 */
public Mono<Void> destroyLater() {
  if (connection != null) {
    Flux<Void> terminationSignals = null;
    while (!subscriptions.isEmpty()) {
      Map<ReactiveSubscription, Subscribers> local = new HashMap<>(subscriptions);
      List<Mono<Void>> monos = local.keySet().stream() //
          .peek(subscriptions::remove) //
          .map(ReactiveSubscription::cancel) //
          .collect(Collectors.toList());
      if (terminationSignals == null) {
        terminationSignals = Flux.concat(monos);
      } else {
        terminationSignals = terminationSignals.mergeWith(Flux.concat(monos));
      }
    }
    if (terminationSignals != null) {
      return terminationSignals.collectList()
          .doFinally(signalType -> connection.closeLater().subscribeOn(Schedulers.immediate()))
          .flatMap(all -> Mono.empty());
    }
    this.connection = null;
  }
  return Mono.empty();
}

代码示例来源:origin: spring-projects/spring-framework

@GetMapping("/infinite")
  Flux<String> infinite() {
    return Flux.just(0, 1).map(l -> "foo " + l)
        .mergeWith(Flux.never())
        .doOnCancel(() -> cancellation.onComplete());
  }
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
  return Flux.from(eventStore)
        .filter(event -> event.getInstance().equals(InstanceId.of(id)))
        .flatMap(event -> registry.getInstance(event.getInstance()))
        .map(event -> ServerSentEvent.builder(event).build())
        .mergeWith(ping());
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Application>> applicationsStream() {
  return Flux.from(eventPublisher)
        .flatMap(event -> registry.getInstance(event.getInstance()))
        .map(this::getApplicationForInstance)
        .flatMap(group -> toApplication(group.getT1(), group.getT2()))
        .map(application -> ServerSentEvent.builder(application).build())
        .mergeWith(ping());
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<Message<ByteBuffer, ByteBuffer>> receive() {
  Flux<Message<ByteBuffer, ByteBuffer>> channelMessages = channelState.receive(() -> commands.observeChannels() //
      .filter(message -> channelState.getTargets().contains(message.getChannel())) //
      .map(message -> new ChannelMessage<>(message.getChannel(), message.getMessage())));
  Flux<Message<ByteBuffer, ByteBuffer>> patternMessages = patternState.receive(() -> commands.observePatterns() //
      .filter(message -> patternState.getTargets().contains(message.getPattern())) //
      .map(message -> new PatternMessage<>(message.getPattern(), message.getChannel(), message.getMessage())));
  return channelMessages.mergeWith(patternMessages);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  String path = request.getURI().getPath();
  switch (path) {
    case "/write-and-flush":
      return response.writeAndFlushWith(
          testInterval(Duration.ofMillis(50), 2)
              .map(longValue -> wrap("data" + longValue + "\n", response))
              .map(Flux::just)
              .mergeWith(Flux.never()));
    case "/write-and-complete":
      return response.writeWith(
          chunks1K().take(64).map(s -> wrap(s, response)));
    case "/write-and-never-complete":
      // Reactor requires at least 50 to flush, Tomcat/Undertow 8, Jetty 1
      return response.writeWith(
          chunks1K().take(64).map(s -> wrap(s, response)).mergeWith(Flux.never()));
    default:
      return response.writeWith(Flux.empty());
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then();
  }
}

代码示例来源:origin: spring-projects/spring-data-redis

private <C, B> Flux<Message<C, B>> doReceive(SerializationPair<C> channelSerializer,
    SerializationPair<B> messageSerializer, Mono<ReactiveSubscription> subscription, ByteBuffer[] patterns,
    ByteBuffer[] channels) {
  Flux<Message<ByteBuffer, ByteBuffer>> messageStream = subscription.flatMapMany(it -> {
    Mono<Void> subscribe = subscribe(patterns, channels, it);
    MonoProcessor<ChannelMessage<ByteBuffer, ByteBuffer>> terminalProcessor = MonoProcessor.create();
    return it.receive().mergeWith(subscribe.then(Mono.defer(() -> {
      getSubscribers(it).registered();
      return Mono.empty();
    }))).doOnCancel(() -> {
      Subscribers subscribers = getSubscribers(it);
      if (subscribers.unregister()) {
        subscriptions.remove(it);
        it.unsubscribe().subscribe(v -> terminalProcessor.onComplete(), terminalProcessor::onError);
      }
    }).mergeWith(terminalProcessor);
  });
  return messageStream
      .map(message -> readMessage(channelSerializer.getReader(), messageSerializer.getReader(), message));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldPropagateOnCompleteWithMergedElementsCorrectly() {
  Flux<String> switchTransformed = Flux.empty()
                     .switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf)
                                            .mergeWith(Flux.just("1", "2", "3")));
  StepVerifier.create(switchTransformed)
        .expectNext("1", "2", "3")
        .expectComplete()
        .verify(Duration.ofSeconds(10));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noStackOverflow() {
  int n = 5000;
  
  Flux<Integer> source = Flux.just(1);
  
  Flux<Integer> result = source;
  
  for (int i = 0; i < n; i++) {
    result = result.mergeWith(source);
  }
  
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  
  result.subscribe(ts);
  
  ts.assertValueCount(n + 1)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mergeWithNoStackoverflow() {
  int n = 5000;
  Flux<Integer> source = Flux.just(1);
  Flux<Integer> result = source;
  for (int i = 0; i < n; i++) {
    result = result.mergeWith(source);
  }
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  result.subscribe(ts);
  ts.assertValueCount(n + 1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void dontBreakFluxArrayFlatmap() {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();
    
    Flux.just(1, 2).flatMap(Flux::just).mergeWith(Flux.just(3))
    .subscribe(ts);

    
    ts.assertValues(1, 2, 3)
    .assertNoError()
    .assertComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noStackOverflow2() {
  int n = 5000;
  
  Flux<Integer> source = Flux.just(1, 2).flatMap(Flux::just);
  Flux<Integer> add = Flux.just(3);
  
  Flux<Integer> result = source;
  
  for (int i = 0; i < n; i++) {
    result = result.mergeWith(add);
  }
  
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  
  result.subscribe(ts);
  
  ts.assertValueCount(n + 2)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noStackOverflow3() {
  int n = 5000;
  
  Flux<Flux<Integer>> source = Flux.just(Flux.just(1), Flux.just(2));
  Flux<Flux<Integer>> add = Flux.just(Flux.just(3));
  
  Flux<Flux<Integer>> result = source;
  
  for (int i = 0; i < n; i++) {
    result = result.mergeWith(add);
  }
  
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  
  result.subscribe(ts);
  
  ts.assertValueCount(n + 2)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: rsocket/rsocket-java

private static Flux<Long> input() {
 Flux<Long> interval = Flux.interval(Duration.ofMillis(1)).onBackpressureDrop();
 for (int i = 0; i < 10; i++) {
  interval = interval.mergeWith(interval);
 }
 return interval;
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Mono<List<DomainSummary>> listAvailableDomains(CloudFoundryClient cloudFoundryClient, String organizationId) {
  return requestListPrivateDomains(cloudFoundryClient, organizationId)
    .map(DefaultApplications::toDomain)
    .mergeWith(requestListSharedDomains(cloudFoundryClient)
      .map(DefaultApplications::toDomain))
    .collectList();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<Domain> list() {
  return this.cloudFoundryClient
    .flatMapMany(cloudFoundryClient -> requestListPrivateDomains(cloudFoundryClient)
      .map(DefaultDomains::toDomain)
      .mergeWith(requestListSharedDomains(cloudFoundryClient)
        .map(DefaultDomains::toDomain)))
    .transform(OperationsLogging.log("List Domains"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Mono<List<String>> getDomainNames(CloudFoundryClient cloudFoundryClient, String organizationId) {
  return requestListPrivateDomains(cloudFoundryClient, organizationId)
    .map(resource -> resource.getEntity().getName())
    .mergeWith(requestListSharedDomains(cloudFoundryClient)
      .map(resource -> resource.getEntity().getName()))
    .collectList();
}

代码示例来源:origin: mohitsinha/tutorials

@Test
public void interleave() {
  Flux<Long> delay = Flux.interval(Duration.ofMillis(5));
  Flux<String> alphabetsWithDelay = Flux.just("A", "B").zipWith(delay, (s, l) -> s);
  Flux<String> alphabetsWithoutDelay = Flux.just("C", "D");
  Flux<String> interleavedFlux = alphabetsWithDelay.mergeWith(alphabetsWithoutDelay);
  StepVerifier.create(interleavedFlux).expectNext("C", "D", "A", "B").verifyComplete();
  Flux<String> nonInterleavedFlux = alphabetsWithDelay.concatWith(alphabetsWithoutDelay);
  StepVerifier.create(nonInterleavedFlux).expectNext("A", "B", "C", "D").verifyComplete();
}

相关文章

Flux类方法