本文整理了Java中reactor.core.publisher.Flux.concatMap()
方法的一些代码示例,展示了Flux.concatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatMap()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatMap
[英]Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.
There are three dimensions to this operator that can be compared with #flatMap(Function) and #flatMapSequential(Function):
Errors will immediately short circuit current concat backlog.
[中]将此流量发出的元素异步转换为发布服务器,然后将这些内部发布服务器展平为单个流量,并使用串联保持顺序。
此运算符有三个维度可与#flatMap(函数)和#flatMapSequential(函数)进行比较:
*生成内部文件和订阅:此操作符在生成下一个内部文件并订阅它之前,等待一个内部文件完成。
*展平值的顺序:该操作符自然保留与源元素相同的顺序,将每个源元素的内部元素按顺序连接起来。
*交错:该运算符不允许来自不同内部的值交错(串联)。
错误会立即使电流短路。
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<Collection<ByteBuffer>, Long>> touch(Publisher<Collection<ByteBuffer>> keysCollection) {
return connection.execute(cmd -> Flux.from(keysCollection).concatMap((keys) -> {
Assert.notEmpty(keys, "Keys must not be null!");
return cmd.touch(keys.toArray(new ByteBuffer[keys.size()])).map((value) -> new NumericResponse<>(keys, value));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<MultiValueResponse<ByteBuffer, ByteBuffer>> keys(Publisher<ByteBuffer> patterns) {
return connection.execute(cmd -> Flux.from(patterns).concatMap(pattern -> {
Assert.notNull(pattern, "Pattern must not be null!");
// TODO: stream elements instead of collection
return cmd.keys(pattern).collectList().map(value -> new MultiValueResponse<>(pattern, value));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<PopResponse> bPop(Publisher<BPopCommand> commands) {
return connection.executeDedicated(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Keys must not be null!");
Assert.notNull(command.getDirection(), "Direction must not be null!");
long timeout = command.getTimeout().get(ChronoUnit.SECONDS);
Mono<PopResult> mappedMono = (ObjectUtils.nullSafeEquals(Direction.RIGHT, command.getDirection())
? cmd.brpop(timeout, command.getKeys().stream().toArray(ByteBuffer[]::new))
: cmd.blpop(timeout, command.getKeys().stream().toArray(ByteBuffer[]::new)))
.map(kv -> Arrays.asList(kv.getKey(), kv.getValue())).map(PopResult::new);
return mappedMono.map(value -> new PopResponse(command, value));
}));
}
代码示例来源:origin: spring-projects/spring-framework
@Nullable Map<String, Object> hints) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
Assert.notNull(elementType, "'elementType' must not be null");
(mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0]);
return Flux.from(inputStream).
concatMap(region -> {
if (!region.getResource().isReadable()) {
return Flux.error(new EncodingException("Resource " +
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<MultiValueResponse<List<ByteBuffer>, ByteBuffer>> mGet(Publisher<List<ByteBuffer>> keyCollections) {
return connection.execute(cmd -> Flux.from(keyCollections).concatMap((keys) -> {
Assert.notNull(keys, "Keys must not be null!");
return cmd.mget(keys.toArray(new ByteBuffer[0])).map((value) -> value.getValueOrElse(EMPTY_BYTE_BUFFER))
.collectList().map((values) -> new MultiValueResponse<>(keys, values));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<ByteBufferResponse<BRPopLPushCommand>> bRPopLPush(Publisher<BRPopLPushCommand> commands) {
return connection.executeDedicated(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getDestination(), "Destination key must not be null!");
Assert.notNull(command.getTimeout(), "Timeout must not be null!");
return cmd.brpoplpush(command.getTimeout().get(ChronoUnit.SECONDS), command.getKey(), command.getDestination())
.map(value -> new ByteBufferResponse<>(command, value));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<KeyCommand, Long>> strLen(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> {
return Flux.from(commands).concatMap(command -> {
return cmd.strlen(command.getKey()).map(respValue -> new NumericResponse<>(command, respValue));
});
});
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream).
take(1).
concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Keys must not be null!");
Flux<ByteBuffer> result = cmd.sinter(command.getKeys().toArray(new ByteBuffer[0]));
return Mono.just(new CommandResponse<>(command, result));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notEmpty(command.getKeyValuePairs(), "Pairs must not be null or empty!");
return cmd.msetnx(command.getKeyValuePairs()).map((value) -> new BooleanResponse<>(command, value));
}));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(
Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
Mono<Map<ByteBuffer, ByteBuffer>> result = cmd.hgetall(command.getKey());
return Mono.just(new CommandResponse<>(command, result.flatMapMany(v -> Flux.fromStream(v.entrySet().stream()))));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeyValuePairs().keySet())) {
return super.mSetNX(Mono.just(command));
}
return Mono
.error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for MSETNX command."));
}));
}
}
代码示例来源:origin: line/armeria
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
// Prefetch 1 message because Armeria's HttpRequestSubscriber consumes messages one by one.
return write(Flux.from(body).concatMap(Flux::from, 1));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDiffCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Keys must not be null!");
Flux<ByteBuffer> result = cmd.sdiff(command.getKeys().toArray(new ByteBuffer[0]));
return Mono.just(new CommandResponse<>(command, result));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
keys.add(command.getDestinationKey());
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
return super.bitOp(Mono.just(command));
}
return Mono
.error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for BITOP command."));
}));
}
代码示例来源:origin: org.springframework/spring-core
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream).
take(1).
concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<CommandResponse<KeyCommand, DataType>> type(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null!");
return cmd.type(command.getKey()).map(LettuceConverters::toDataType)
.map(respValue -> new CommandResponse<>(command, respValue));
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");
if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getSourceKeys())) {
return super.zUnionStore(Mono.just(command));
}
return Mono
.error(new InvalidDataAccessApiUsageException("All keys must map to the same slot for ZUNIONSTORE command."));
}));
}
内容来源于网络,如有侵权,请联系作者删除!