本文整理了Java中reactor.core.publisher.Flux.flatMap()
方法的一些代码示例,展示了Flux.flatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.flatMap()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:flatMap
[英]Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
There are three dimensions to this operator that can be compared with #flatMapSequential(Function) and #concatMap(Function):
代码示例来源:origin: spring-projects/spring-framework
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
return flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
.orElse(Mono.empty()));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Mono<Void> notify(InstanceEvent event) {
return Flux.fromIterable(delegates).flatMap(d -> d.notify(event).onErrorResume(error -> {
log.warn("Unexpected exception while triggering notifications. Notification might not be sent.", error);
return Mono.empty();
})).then();
}
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
* @param dataBuffers the source data buffers
* @param jsonFactory the factory to use
* @param tokenizeArrayElements if {@code true} and the "top level" JSON
* object is an array, each element is returned individually, immediately
* after it is received.
* @return the result token buffers
*/
public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
boolean tokenizeArrayElements) {
try {
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, tokenizeArrayElements);
return dataBuffers.flatMap(tokenizer::tokenize, Flux::error, tokenizer::endOfInput);
}
catch (IOException ex) {
return Flux.error(ex);
}
}
代码示例来源:origin: codecentric/spring-boot-admin
@DeleteMapping(path = "/applications/{name}")
public Mono<ResponseEntity<Void>> unregister(@PathVariable("name") String name) {
log.debug("Unregister application with name '{}'", name);
return registry.getInstances(name)
.flatMap(instance -> registry.deregister(instance.getId()))
.collectList()
.map(deregistered -> !deregistered.isEmpty() ? ResponseEntity.noContent()
.build() : ResponseEntity.notFound()
.build());
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector"))
.filter(event -> event instanceof InstanceStatusChangedEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(this::detectEndpoints);
}
代码示例来源:origin: spring-projects/spring-framework
private Flux<?> decodePartValues(Flux<Part> parts, MethodParameter elementType, BindingContext bindingContext,
ServerWebExchange exchange, boolean isRequired) {
return parts.flatMap(part -> {
ServerHttpRequest partRequest = new PartServerHttpRequest(exchange.getRequest(), part);
ServerWebExchange partExchange = exchange.mutate().request(partRequest).build();
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Decoding part '" + part.name() + "'");
}
return readBody(elementType, isRequired, bindingContext, partExchange);
});
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Started reminders"))
.flatMap(i -> this.sendReminders())
.onErrorContinue((ex, value) -> log.warn(
"Unexpected error while sending reminders",
ex
))
.subscribe();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> bodyWithFlush) {
return Flux.from(bodyWithFlush).flatMap(body ->
Flux.from(body).map(b -> {
this.body.add(b);
return b;
})
).then();
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
return Flux.from(eventStore)
.filter(event -> event.getInstance().equals(InstanceId.of(id)))
.flatMap(event -> registry.getInstance(event.getInstance()))
.map(event -> ServerSentEvent.builder(event).build())
.mergeWith(ping());
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/applications", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Application> applications() {
return registry.getInstances()
.filter(Instance::isRegistered)
.groupBy(instance -> instance.getRegistration().getName())
.flatMap(grouped -> toApplication(grouped.key(), grouped));
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Flux<Instance> findAll() {
return eventStore.findAll()
.groupBy(InstanceEvent::getInstance)
.flatMap(f -> f.reduce(Instance.create(f.key()), Instance::apply));
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("status-updater"))
.filter(event -> event instanceof InstanceRegisteredEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(event -> updateStatus(event.getInstance()));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void decodeToMono() {
List<Pojo> expected = Arrays.asList(pojo1, pojo2);
Flux<DataBuffer> input = Flux.just(expected)
.map(this::writeObject)
.flatMap(this::dataBuffer);
ResolvableType elementType = ResolvableType.forClassWithGenerics(List.class, Pojo.class);
testDecodeToMono(input, elementType, step -> step
.expectNext(expected)
.expectComplete()
.verify(), null, null);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void decode() {
Flux<DataBuffer> input = Flux.just(this.pojo1, this.pojo2)
.map(this::writeObject)
.flatMap(this::dataBuffer);
testDecodeAll(input, Pojo.class, step -> step
.expectNext(pojo1)
.expectNext(pojo2)
.verifyComplete());
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void decodeChunksToMono() {
byte[] full = this.testMsg1.toByteArray();
byte[] chunk1 = Arrays.copyOfRange(full, 0, full.length / 2);
byte[] chunk2 = Arrays.copyOfRange(full, chunk1.length, full.length);
Flux<DataBuffer> input = Flux.just(chunk1, chunk2)
.flatMap(bytes -> Mono.defer(() -> {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length);
dataBuffer.write(bytes);
return Mono.just(dataBuffer);
}));
testDecodeToMono(input, Msg.class, step -> step
.expectNext(this.testMsg1)
.verifyComplete());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void encode() throws Exception {
Flux<DataBuffer> input = Flux.just(this.fooBytes, this.barBytes)
.flatMap(bytes -> Mono.defer(() -> {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length);
dataBuffer.write(bytes);
return Mono.just(dataBuffer);
}));
testEncodeAll(input, DataBuffer.class, step -> step
.consumeNextWith(expectBytes(this.fooBytes))
.consumeNextWith(expectBytes(this.barBytes))
.verifyComplete());
}
代码示例来源:origin: spring-projects/spring-framework
@Override
@Test
public void decode() {
Flux<DataBuffer> input = Flux.just(this.testMsg1, this.testMsg2)
.flatMap(msg -> Mono.defer(() -> {
DataBuffer buffer = this.bufferFactory.allocateBuffer();
try {
msg.writeDelimitedTo(buffer.asOutputStream());
return Mono.just(buffer);
}
catch (IOException e) {
release(buffer);
return Mono.error(e);
}
}));
testDecodeAll(input, Msg.class, step -> step
.expectNext(this.testMsg1)
.expectNext(this.testMsg2)
.verifyComplete());
}
内容来源于网络,如有侵权,请联系作者删除!