reactor.core.publisher.Flux.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(789)

本文整理了Java中reactor.core.publisher.Flux.flatMap()方法的一些代码示例,展示了Flux.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.flatMap()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:flatMap

Flux.flatMap介绍

[英]Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

There are three dimensions to this operator that can be compared with #flatMapSequential(Function) and #concatMap(Function):

  • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
  • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
  • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
    [中]将此流量发出的元素异步转换为发布服务器,然后通过合并将这些内部发布服务器扁平化为单个流量,从而允许它们交错。
    此运算符有三个维度可与#flatMapSequential(函数)和#concatMap(函数)进行比较:
    *inners的生成和订阅:该操作员急切地订阅其inners。
    *展平值的顺序:此运算符不一定保留原始顺序,因为内部元素到达时会展平。
    *交错:该运算符允许来自不同内部的值交错(类似于合并内部序列)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
  return flux -> flux
      .scan(1, (count, element) -> count++)
      .flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
          .map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
          .orElse(Mono.empty()));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return writeWith(Flux.from(body).flatMap(p -> p));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
  public Mono<Void> notify(InstanceEvent event) {
    return Flux.fromIterable(delegates).flatMap(d -> d.notify(event).onErrorResume(error -> {
      log.warn("Unexpected exception while triggering notifications. Notification might not be sent.", error);
      return Mono.empty();
    })).then();
  }
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
 * @param dataBuffers the source data buffers
 * @param jsonFactory the factory to use
 * @param tokenizeArrayElements if {@code true} and the "top level" JSON
 * object is an array, each element is returned individually, immediately
 * after it is received.
 * @return the result token buffers
 */
public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
    boolean tokenizeArrayElements) {
  try {
    JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
    Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, tokenizeArrayElements);
    return dataBuffers.flatMap(tokenizer::tokenize, Flux::error, tokenizer::endOfInput);
  }
  catch (IOException ex) {
    return Flux.error(ex);
  }
}

代码示例来源:origin: codecentric/spring-boot-admin

@DeleteMapping(path = "/applications/{name}")
public Mono<ResponseEntity<Void>> unregister(@PathVariable("name") String name) {
  log.debug("Unregister application with name '{}'", name);
  return registry.getInstances(name)
          .flatMap(instance -> registry.deregister(instance.getId()))
          .collectList()
          .map(deregistered -> !deregistered.isEmpty() ? ResponseEntity.noContent()
                                        .build() : ResponseEntity.notFound()
                                                     .build());
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector"))
          .filter(event -> event instanceof InstanceStatusChangedEvent ||
                   event instanceof InstanceRegistrationUpdatedEvent)
          .flatMap(this::detectEndpoints);
}

代码示例来源:origin: spring-projects/spring-framework

private Flux<?> decodePartValues(Flux<Part> parts, MethodParameter elementType, BindingContext bindingContext,
    ServerWebExchange exchange, boolean isRequired) {
  return parts.flatMap(part -> {
    ServerHttpRequest partRequest = new PartServerHttpRequest(exchange.getRequest(), part);
    ServerWebExchange partExchange = exchange.mutate().request(partRequest).build();
    if (logger.isDebugEnabled()) {
      logger.debug(exchange.getLogPrefix() + "Decoding part '" + part.name() + "'");
    }
    return readBody(elementType, isRequired, bindingContext, partExchange);
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  Flux<ContentChunk> chunks = Flux.from(body)
      .flatMap(Function.identity())
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
      .map(this::toContentChunk);
  ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
  this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
  return doCommit(this::completes);
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  protected Mono<Void> writeAndFlushWithInternal(
      Publisher<? extends Publisher<? extends DataBuffer>> bodyWithFlush) {
    return Flux.from(bodyWithFlush).flatMap(body ->
      Flux.from(body).map(b -> {
        this.body.add(b);
        return b;
      })
    ).then();
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return writeWith(Flux.from(body).flatMap(p -> p));
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
  return Flux.from(eventStore)
        .filter(event -> event.getInstance().equals(InstanceId.of(id)))
        .flatMap(event -> registry.getInstance(event.getInstance()))
        .map(event -> ServerSentEvent.builder(event).build())
        .mergeWith(ping());
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/applications", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Application> applications() {
  return registry.getInstances()
          .filter(Instance::isRegistered)
          .groupBy(instance -> instance.getRegistration().getName())
          .flatMap(grouped -> toApplication(grouped.key(), grouped));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<Instance> findAll() {
  return eventStore.findAll()
           .groupBy(InstanceEvent::getInstance)
           .flatMap(f -> f.reduce(Instance.create(f.key()), Instance::apply));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("status-updater"))
          .filter(event -> event instanceof InstanceRegisteredEvent ||
                   event instanceof InstanceRegistrationUpdatedEvent)
          .flatMap(event -> updateStatus(event.getInstance()));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public void decodeToMono() {
  List<Pojo> expected = Arrays.asList(pojo1, pojo2);
  Flux<DataBuffer> input = Flux.just(expected)
      .map(this::writeObject)
      .flatMap(this::dataBuffer);
  ResolvableType elementType = ResolvableType.forClassWithGenerics(List.class, Pojo.class);
  testDecodeToMono(input, elementType, step -> step
      .expectNext(expected)
      .expectComplete()
      .verify(), null, null);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public void decode() {
  Flux<DataBuffer> input = Flux.just(this.pojo1, this.pojo2)
      .map(this::writeObject)
      .flatMap(this::dataBuffer);
  testDecodeAll(input, Pojo.class, step -> step
      .expectNext(pojo1)
      .expectNext(pojo2)
      .verifyComplete());
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void decodeChunksToMono() {
  byte[] full = this.testMsg1.toByteArray();
  byte[] chunk1 = Arrays.copyOfRange(full, 0, full.length / 2);
  byte[] chunk2 = Arrays.copyOfRange(full, chunk1.length, full.length);
  Flux<DataBuffer> input = Flux.just(chunk1, chunk2)
      .flatMap(bytes -> Mono.defer(() -> {
        DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length);
        dataBuffer.write(bytes);
        return Mono.just(dataBuffer);
      }));
  testDecodeToMono(input, Msg.class, step -> step
      .expectNext(this.testMsg1)
      .verifyComplete());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public void encode() throws Exception {
  Flux<DataBuffer> input = Flux.just(this.fooBytes, this.barBytes)
      .flatMap(bytes -> Mono.defer(() -> {
        DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length);
        dataBuffer.write(bytes);
        return Mono.just(dataBuffer);
      }));
  testEncodeAll(input, DataBuffer.class, step -> step
      .consumeNextWith(expectBytes(this.fooBytes))
      .consumeNextWith(expectBytes(this.barBytes))
      .verifyComplete());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
@Test
public void decode() {
  Flux<DataBuffer> input = Flux.just(this.testMsg1, this.testMsg2)
      .flatMap(msg -> Mono.defer(() -> {
        DataBuffer buffer = this.bufferFactory.allocateBuffer();
        try {
          msg.writeDelimitedTo(buffer.asOutputStream());
          return Mono.just(buffer);
        }
        catch (IOException e) {
          release(buffer);
          return Mono.error(e);
        }
      }));
  testDecodeAll(input, Msg.class, step -> step
      .expectNext(this.testMsg1)
      .expectNext(this.testMsg2)
      .verifyComplete());
}

相关文章

Flux类方法