本文整理了Java中reactor.core.publisher.Flux.from()
方法的一些代码示例,展示了Flux.from()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.from()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:from
[英]Decorate the specified Publisher with the Flux API.
[中]使用Flux API装饰指定的发布服务器。
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(
Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Mono<Map<ByteBuffer, ByteBuffer>> result = cmd.hgetall(command.getKey());
return Mono.just(new CommandResponse<>(command, result.flatMapMany(v -> Flux.fromStream(v.entrySet().stream()))));
}));
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Relay buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
* the given maximum byte count, or until the publisher is complete.
* @param publisher the publisher to filter
* @param maxByteCount the maximum byte count
* @return a flux whose maximum byte count is {@code maxByteCount}
*/
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
}); // no doOnDiscard necessary, as this method does not drop buffers
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<ByteBufferResponse<BRPopLPushCommand>> bRPopLPush(Publisher<BRPopLPushCommand> commands) {
return connection.executeDedicated(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getDestination(), "Destination key must not be null!");
Assert.notNull(command.getTimeout(), "Timeout must not be null!");
return cmd.brpoplpush(command.getTimeout().get(ChronoUnit.SECONDS), command.getKey(), command.getDestination())
.map(value -> new ByteBufferResponse<>(command, value));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<RangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getRange(), "Range must not be null!");
Assert.notNull(command.getLimit(), "Limit must not be null!");
io.lettuce.core.Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
return new CommandResponse<>(command, cmd.xrange(command.getKey(), lettuceRange, lettuceLimit)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReadCommand> commands) {
return Flux.from(commands).map(command -> {
Assert.notNull(command.getStreamOffsets(), "StreamOffsets must not be null!");
Assert.notNull(command.getReadOptions(), "ReadOptions must not be null!");
StreamReadOptions readOptions = command.getReadOptions();
if (readOptions.getBlock() != null && readOptions.getBlock() > 0) {
return new CommandResponse<>(command, connection.executeDedicated(cmd -> doRead(command, readOptions, cmd)));
}
return new CommandResponse<>(command, connection.execute(cmd -> doRead(command, readOptions, cmd)));
});
}
代码示例来源:origin: spring-projects/spring-framework
@Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
(mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0]);
return Flux.from(inputStream).
concatMap(region -> {
if (!region.getResource().isReadable()) {
return Flux.error(new EncodingException("Resource " +
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeyValuePairs().keySet())) {
return super.mSetNX(Mono.just(command));
}
return Mono
.error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for MSETNX command."));
}));
}
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<MultiValueResponse<ByteBuffer, ByteBuffer>> keys(Publisher<ByteBuffer> patterns) {
return connection.execute(cmd -> Flux.from(patterns).concatMap(pattern -> {
Assert.notNull(pattern, "Pattern must not be null!");
// TODO: stream elements instead of collection
return cmd.keys(pattern).collectList().map(value -> new MultiValueResponse<>(pattern, value));
}));
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Return a new {@code DataBuffer} composed from joining together the given
* {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
* the returned buffer may be a single buffer containing all data of the
* provided buffers, or it may be a zero-copy, composite with references to
* the given buffers.
* <p>If {@code dataBuffers} produces an error or if there is a cancel
* signal, then all accumulated buffers will be
* {@linkplain #release(DataBuffer) released}.
* <p>Note that the given data buffers do <strong>not</strong> have to be
* released. They will be released as part of the returned composite.
* @param dataBuffers the data buffers that are to be composed
* @return a buffer that is composed from the {@code dataBuffers} argument
* @since 5.0.3
*/
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
return Flux.from(dataBuffers)
.collectList()
.filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<RangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getRange(), "Range must not be null!");
Assert.notNull(command.getLimit(), "Limit must not be null!");
io.lettuce.core.Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
return new CommandResponse<>(command, cmd.xrevrange(command.getKey(), lettuceRange, lettuceLimit)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
}));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream).
take(1).
concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
keys.add(command.getDestinationKey());
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
return super.bitOp(Mono.just(command));
}
return Mono
.error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for BITOP command."));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<MultiValueResponse<List<ByteBuffer>, ByteBuffer>> mGet(Publisher<List<ByteBuffer>> keyCollections) {
return connection.execute(cmd -> Flux.from(keyCollections).concatMap((keys) -> {
Assert.notNull(keys, "Keys must not be null!");
return cmd.mget(keys.toArray(new ByteBuffer[0])).map((value) -> value.getValueOrElse(EMPTY_BYTE_BUFFER))
.collectList().map((values) -> new MultiValueResponse<>(keys, values));
}));
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
* Does <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @param position the file position at which the write is to begin; must be non-negative
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Flux<DataBuffer> write(
Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink -> {
AsynchronousFileChannelWriteCompletionHandler completionHandler =
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
sink.onDispose(completionHandler);
flux.subscribe(completionHandler);
});
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");
if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getSourceKeys())) {
return super.zUnionStore(Mono.just(command));
}
return Mono
.error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for ZUNIONSTORE command."));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Keys must not be null!");
Flux<ByteBuffer> result = cmd.sinter(command.getKeys().toArray(new ByteBuffer[0]));
return Mono.just(new CommandResponse<>(command, result));
}));
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
* <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink -> {
WritableByteChannelSubscriber subscriber =
new WritableByteChannelSubscriber(sink, channel);
sink.onDispose(subscriber);
flux.subscribe(subscriber);
});
}
代码示例来源:origin: org.springframework/spring-core
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream).
take(1).
concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}
内容来源于网络,如有侵权,请联系作者删除!