reactor.core.publisher.Flux.from()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.4k)|赞(0)|评价(0)|浏览(612)

本文整理了Java中reactor.core.publisher.Flux.from()方法的一些代码示例,展示了Flux.from()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.from()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:from

Flux.from介绍

[英]Decorate the specified Publisher with the Flux API.
[中]使用Flux API装饰指定的发布服务器。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<Void> writeAndFlushWithInternal(
    Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(
    Publisher<KeyCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Mono<Map<ByteBuffer, ByteBuffer>> result = cmd.hgetall(command.getKey());
    return Mono.just(new CommandResponse<>(command, result.flatMapMany(v -> Flux.fromStream(v.entrySet().stream()))));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Relay buffers from the given {@link Publisher} until the total
 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
 * the given maximum byte count, or until the publisher is complete.
 * @param publisher the publisher to filter
 * @param maxByteCount the maximum byte count
 * @return a flux whose maximum byte count is {@code maxByteCount}
 */
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
  Assert.notNull(publisher, "Publisher must not be null");
  Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
  return Flux.defer(() -> {
    AtomicLong countDown = new AtomicLong(maxByteCount);
    return Flux.from(publisher)
        .map(buffer -> {
          long remainder = countDown.addAndGet(-buffer.readableByteCount());
          if (remainder < 0) {
            int length = buffer.readableByteCount() + (int) remainder;
            return buffer.slice(0, length);
          }
          else {
            return buffer;
          }
        })
        .takeUntil(buffer -> countDown.get() <= 0);
  }); // no doOnDiscard necessary, as this method does not drop buffers
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<ByteBufferResponse<BRPopLPushCommand>> bRPopLPush(Publisher<BRPopLPushCommand> commands) {
  return connection.executeDedicated(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Assert.notNull(command.getDestination(), "Destination key must not be null!");
    Assert.notNull(command.getTimeout(), "Timeout must not be null!");
    return cmd.brpoplpush(command.getTimeout().get(ChronoUnit.SECONDS), command.getKey(), command.getDestination())
        .map(value -> new ByteBufferResponse<>(command, value));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<RangeCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).map(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Assert.notNull(command.getRange(), "Range must not be null!");
    Assert.notNull(command.getLimit(), "Limit must not be null!");
    io.lettuce.core.Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
    io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
    return new CommandResponse<>(command, cmd.xrange(command.getKey(), lettuceRange, lettuceLimit)
        .map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReadCommand> commands) {
  return Flux.from(commands).map(command -> {
    Assert.notNull(command.getStreamOffsets(), "StreamOffsets must not be null!");
    Assert.notNull(command.getReadOptions(), "ReadOptions must not be null!");
    StreamReadOptions readOptions = command.getReadOptions();
    if (readOptions.getBlock() != null && readOptions.getBlock() > 0) {
      return new CommandResponse<>(command, connection.executeDedicated(cmd -> doRead(command, readOptions, cmd)));
    }
    return new CommandResponse<>(command, connection.execute(cmd -> doRead(command, readOptions, cmd)));
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
      (mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0]);
  return Flux.from(inputStream).
      concatMap(region -> {
        if (!region.getResource().isReadable()) {
          return Flux.error(new EncodingException("Resource " +

代码示例来源:origin: spring-projects/spring-data-redis

@Override
  public Flux<ReactiveRedisConnection.BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {

    return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

      if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeyValuePairs().keySet())) {
        return super.mSetNX(Mono.just(command));
      }

      return Mono
          .error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for MSETNX command."));
    }));
  }
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<MultiValueResponse<ByteBuffer, ByteBuffer>> keys(Publisher<ByteBuffer> patterns) {
  return connection.execute(cmd -> Flux.from(patterns).concatMap(pattern -> {
    Assert.notNull(pattern, "Pattern must not be null!");
    // TODO: stream elements instead of collection
    return cmd.keys(pattern).collectList().map(value -> new MultiValueResponse<>(pattern, value));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Return a new {@code DataBuffer} composed from joining together the given
 * {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
 * the returned buffer may be a single buffer containing all data of the
 * provided buffers, or it may be a zero-copy, composite with references to
 * the given buffers.
 * <p>If {@code dataBuffers} produces an error or if there is a cancel
 * signal, then all accumulated buffers will be
 * {@linkplain #release(DataBuffer) released}.
 * <p>Note that the given data buffers do <strong>not</strong> have to be
 * released. They will be released as part of the returned composite.
 * @param dataBuffers the data buffers that are to be composed
 * @return a buffer that is composed from the {@code dataBuffers} argument
 * @since 5.0.3
 */
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
  Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
  return Flux.from(dataBuffers)
      .collectList()
      .filter(list -> !list.isEmpty())
      .map(list -> list.get(0).factory().join(list))
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<RangeCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).map(command -> {
    Assert.notNull(command.getKey(), "Key must not be null!");
    Assert.notNull(command.getRange(), "Range must not be null!");
    Assert.notNull(command.getLimit(), "Limit must not be null!");
    io.lettuce.core.Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
    io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
    return new CommandResponse<>(command, cmd.xrevrange(command.getKey(), lettuceRange, lettuceLimit)
        .map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      take(1).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
    keys.add(command.getDestinationKey());
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
      return super.bitOp(Mono.just(command));
    }
    return Mono
        .error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for BITOP command."));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<MultiValueResponse<List<ByteBuffer>, ByteBuffer>> mGet(Publisher<List<ByteBuffer>> keyCollections) {
  return connection.execute(cmd -> Flux.from(keyCollections).concatMap((keys) -> {
    Assert.notNull(keys, "Keys must not be null!");
    return cmd.mget(keys.toArray(new ByteBuffer[0])).map((value) -> value.getValueOrElse(EMPTY_BYTE_BUFFER))
        .collectList().map((values) -> new MultiValueResponse<>(keys, values));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
 * Does <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @param position the file position at which the write is to begin; must be non-negative
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(
    Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
  Assert.notNull(source, "'source' must not be null");
  Assert.notNull(channel, "'channel' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Flux<DataBuffer> flux = Flux.from(source);
  return Flux.create(sink -> {
    AsynchronousFileChannelWriteCompletionHandler completionHandler =
        new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
    sink.onDispose(completionHandler);
    flux.subscribe(completionHandler);
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
protected Mono<Void> writeAndFlushWithInternal(
    Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getSourceKeys())) {
      return super.zUnionStore(Mono.just(command));
    }
    return Mono
        .error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for ZUNIONSTORE command."));
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {
  return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Keys must not be null!");
    Flux<ByteBuffer> result = cmd.sinter(command.getKeys().toArray(new ByteBuffer[0]));
    return Mono.just(new CommandResponse<>(command, result));
  }));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
 * <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
  Assert.notNull(source, "'source' must not be null");
  Assert.notNull(channel, "'channel' must not be null");
  Flux<DataBuffer> flux = Flux.from(source);
  return Flux.create(sink -> {
    WritableByteChannelSubscriber subscriber =
        new WritableByteChannelSubscriber(sink, channel);
    sink.onDispose(subscriber);
    flux.subscribe(subscriber);
  });
}

代码示例来源:origin: org.springframework/spring-core

@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
    ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  return Flux.from(inputStream).
      take(1).
      concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}

相关文章

Flux类方法