reactor.core.publisher.Flux.concatMap()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(663)

本文整理了Java中reactor.core.publisher.Flux.concatMap()方法的一些代码示例,展示了Flux.concatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatMap()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatMap

Flux.concatMap介绍

[英]Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

There are three dimensions to this operator that can be compared with #flatMap(Function) and #flatMapSequential(Function):

  • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
  • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
  • Interleaving: this operator does not let values from different inners interleave (concatenation).

Errors will immediately short circuit current concat backlog.
[中]将此流量发出的元素异步转换为发布服务器,然后将这些内部发布服务器展平为单个流量,并使用串联保持顺序。
此运算符有三个维度可与#flatMap(函数)和#flatMapSequential(函数)进行比较:
*生成内部文件和订阅:此操作符在生成下一个内部文件并订阅它之前,等待一个内部文件完成。
*展平值的顺序:该操作符自然保留与源元素相同的顺序,将每个源元素的内部元素按顺序连接起来。
*交错:该运算符不允许来自不同内部的值交错(串联)。
错误会立即使电流短路。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<Void> writeAndFlushWithInternal(
    Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<Collection<ByteBuffer>, Long>> touch(Publisher<Collection<ByteBuffer>> keysCollection) {
  return connection.execute(cmd -> Flux.from(keysCollection).concatMap((keys) -> {
    Assert.notEmpty(keys, "Keys must not be null!");
    return cmd.touch(keys.toArray(new ByteBuffer[keys.size()])).map((value) -> new NumericResponse<>(keys, value));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<MultiValueResponse<ByteBuffer, ByteBuffer>> keys(Publisher<ByteBuffer> patterns) {
  return connection.execute(cmd -> Flux.from(patterns).concatMap(pattern -> {
    Assert.notNull(pattern, "Pattern must not be null!");
    // TODO: stream elements instead of collection
    return cmd.keys(pattern).collectList().map(value -> new MultiValueResponse<>(pattern, value));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<PopResponse> bPop(Publisher<BPopCommand> commands) {
  return connection.executeDedicated(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Keys must not be null!");
    Assert.notNull(command.getDirection(), "Direction must not be null!");
    long timeout = command.getTimeout().get(ChronoUnit.SECONDS);
    Mono<PopResult> mappedMono = (ObjectUtils.nullSafeEquals(Direction.RIGHT, command.getDirection())
        ? cmd.brpop(timeout, command.getKeys().stream().toArray(ByteBuffer[]::new))
        : cmd.blpop(timeout, command.getKeys().stream().toArray(ByteBuffer[]::new)))
            .map(kv -> Arrays.asList(kv.getKey(), kv.getValue())).map(PopResult::new);
    return mappedMono.map(value -> new PopResponse(command, value));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

@Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
      (mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0]);
  return Flux.from(inputStream).
      concatMap(region -> {
        if (!region.getResource().isReadable()) {
          return Flux.error(new EncodingException("Resource " +

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<MultiValueResponse<List<ByteBuffer>, ByteBuffer>> mGet(Publisher<List<ByteBuffer>> keyCollections) {
  return connection.execute(cmd -> Flux.from(keyCollections).concatMap((keys) -> {
    Assert.notNull(keys, "Keys must not be null!");
    return cmd.mget(keys.toArray(new ByteBuffer[0])).map((value) -> value.getValueOrElse(EMPTY_BYTE_BUFFER))
        .collectList().map((values) -> new MultiValueResponse<>(keys, values));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<ByteBufferResponse<BRPopLPushCommand>> bRPopLPush(Publisher<BRPopLPushCommand> commands) {
  return connection.executeDedicated(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Assert.notNull(command.getDestination(), "Destination key must not be null!");
    Assert.notNull(command.getTimeout(), "Timeout must not be null!");
    return cmd.brpoplpush(command.getTimeout().get(ChronoUnit.SECONDS), command.getKey(), command.getDestination())
        .map(value -> new ByteBufferResponse<>(command, value));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<KeyCommand, Long>> strLen(Publisher<KeyCommand> commands) {
  return connection.execute(cmd -> {
    return Flux.from(commands).concatMap(command -> {
      return cmd.strlen(command.getKey()).map(respValue -> new NumericResponse<>(command, respValue));
    });
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      take(1).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Keys must not be null!");
    Flux<ByteBuffer> result = cmd.sinter(command.getKeys().toArray(new ByteBuffer[0]));
    return Mono.just(new CommandResponse<>(command, result));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notEmpty(command.getKeyValuePairs(), "Pairs must not be null or empty!");
    return cmd.msetnx(command.getKeyValuePairs()).map((value) -> new BooleanResponse<>(command, value));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<Void> writeAndFlushWithInternal(
    Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(
    Publisher<KeyCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Mono<Map<ByteBuffer, ByteBuffer>> result = cmd.hgetall(command.getKey());
    return Mono.just(new CommandResponse<>(command, result.flatMapMany(v -> Flux.fromStream(v.entrySet().stream()))));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
  public Flux<ReactiveRedisConnection.BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {

    return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

      if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeyValuePairs().keySet())) {
        return super.mSetNX(Mono.just(command));
      }

      return Mono
          .error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for MSETNX command."));
    }));
  }
}

代码示例来源:origin: line/armeria

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  // Prefetch 1 message because Armeria's HttpRequestSubscriber consumes messages one by one.
  return write(Flux.from(body).concatMap(Flux::from, 1));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDiffCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Keys must not be null!");
    Flux<ByteBuffer> result = cmd.sdiff(command.getKeys().toArray(new ByteBuffer[0]));
    return Mono.just(new CommandResponse<>(command, result));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
    keys.add(command.getDestinationKey());
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
      return super.bitOp(Mono.just(command));
    }
    return Mono
        .error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for BITOP command."));
  }));
}

代码示例来源:origin: org.springframework/spring-core

@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      take(1).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<KeyCommand, DataType>> type(Publisher<KeyCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    return cmd.type(command.getKey()).map(LettuceConverters::toDataType)
        .map(respValue -> new CommandResponse<>(command, respValue));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getSourceKeys())) {
      return super.zUnionStore(Mono.just(command));
    }
    return Mono
        .error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for ZUNIONSTORE command."));
  }));
}

相关文章

Flux类方法