本文整理了Java中reactor.core.publisher.Flux.mergeWith()
方法的一些代码示例,展示了Flux.mergeWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.mergeWith()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:mergeWith
[英]Merge data from this Flux and a Publisher into an interleaved merged sequence. Unlike #concatWith(Publisher), inner sources are subscribed to eagerly.
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
[中]将来自此流量和发布服务器的数据合并到交叉合并序列中。与#concatWith(Publisher)不同,内部资源的订阅非常踊跃。
请注意,merge是为使用异步源或有限源而定制的。当处理尚未在专用调度程序上发布的无限源时,必须在其自己的调度程序中隔离该源,因为merge会在订阅另一个源之前尝试将其耗尽。
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/instances/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<InstanceEvent>> eventStream() {
return Flux.from(eventStore).map(event -> ServerSentEvent.builder(event).build()).mergeWith(ping());
}
代码示例来源:origin: spring-projects/spring-data-redis
/**
* @return the {@link Mono} signalling container termination.
*/
public Mono<Void> destroyLater() {
if (connection != null) {
Flux<Void> terminationSignals = null;
while (!subscriptions.isEmpty()) {
Map<ReactiveSubscription, Subscribers> local = new HashMap<>(subscriptions);
List<Mono<Void>> monos = local.keySet().stream() //
.peek(subscriptions::remove) //
.map(ReactiveSubscription::cancel) //
.collect(Collectors.toList());
if (terminationSignals == null) {
terminationSignals = Flux.concat(monos);
} else {
terminationSignals = terminationSignals.mergeWith(Flux.concat(monos));
}
}
if (terminationSignals != null) {
return terminationSignals.collectList()
.doFinally(signalType -> connection.closeLater().subscribeOn(Schedulers.immediate()))
.flatMap(all -> Mono.empty());
}
this.connection = null;
}
return Mono.empty();
}
代码示例来源:origin: spring-projects/spring-framework
@GetMapping("/infinite")
Flux<String> infinite() {
return Flux.just(0, 1).map(l -> "foo " + l)
.mergeWith(Flux.never())
.doOnCancel(() -> cancellation.onComplete());
}
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
return Flux.from(eventStore)
.filter(event -> event.getInstance().equals(InstanceId.of(id)))
.flatMap(event -> registry.getInstance(event.getInstance()))
.map(event -> ServerSentEvent.builder(event).build())
.mergeWith(ping());
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Application>> applicationsStream() {
return Flux.from(eventPublisher)
.flatMap(event -> registry.getInstance(event.getInstance()))
.map(this::getApplicationForInstance)
.flatMap(group -> toApplication(group.getT1(), group.getT2()))
.map(application -> ServerSentEvent.builder(application).build())
.mergeWith(ping());
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<Message<ByteBuffer, ByteBuffer>> receive() {
Flux<Message<ByteBuffer, ByteBuffer>> channelMessages = channelState.receive(() -> commands.observeChannels() //
.filter(message -> channelState.getTargets().contains(message.getChannel())) //
.map(message -> new ChannelMessage<>(message.getChannel(), message.getMessage())));
Flux<Message<ByteBuffer, ByteBuffer>> patternMessages = patternState.receive(() -> commands.observePatterns() //
.filter(message -> patternState.getTargets().contains(message.getPattern())) //
.map(message -> new PatternMessage<>(message.getPattern(), message.getChannel(), message.getMessage())));
return channelMessages.mergeWith(patternMessages);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
String path = request.getURI().getPath();
switch (path) {
case "/write-and-flush":
return response.writeAndFlushWith(
testInterval(Duration.ofMillis(50), 2)
.map(longValue -> wrap("data" + longValue + "\n", response))
.map(Flux::just)
.mergeWith(Flux.never()));
case "/write-and-complete":
return response.writeWith(
chunks1K().take(64).map(s -> wrap(s, response)));
case "/write-and-never-complete":
// Reactor requires at least 50 to flush, Tomcat/Undertow 8, Jetty 1
return response.writeWith(
chunks1K().take(64).map(s -> wrap(s, response)).mergeWith(Flux.never()));
default:
return response.writeWith(Flux.empty());
}
}
代码示例来源:origin: spring-cloud/spring-cloud-gateway
@Override
public Mono<Void> handle(WebSocketSession session) {
return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then();
}
}
代码示例来源:origin: spring-projects/spring-data-redis
private <C, B> Flux<Message<C, B>> doReceive(SerializationPair<C> channelSerializer,
SerializationPair<B> messageSerializer, Mono<ReactiveSubscription> subscription, ByteBuffer[] patterns,
ByteBuffer[] channels) {
Flux<Message<ByteBuffer, ByteBuffer>> messageStream = subscription.flatMapMany(it -> {
Mono<Void> subscribe = subscribe(patterns, channels, it);
MonoProcessor<ChannelMessage<ByteBuffer, ByteBuffer>> terminalProcessor = MonoProcessor.create();
return it.receive().mergeWith(subscribe.then(Mono.defer(() -> {
getSubscribers(it).registered();
return Mono.empty();
}))).doOnCancel(() -> {
Subscribers subscribers = getSubscribers(it);
if (subscribers.unregister()) {
subscriptions.remove(it);
it.unsubscribe().subscribe(v -> terminalProcessor.onComplete(), terminalProcessor::onError);
}
}).mergeWith(terminalProcessor);
});
return messageStream
.map(message -> readMessage(channelSerializer.getReader(), messageSerializer.getReader(), message));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldPropagateOnCompleteWithMergedElementsCorrectly() {
Flux<String> switchTransformed = Flux.empty()
.switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf)
.mergeWith(Flux.just("1", "2", "3")));
StepVerifier.create(switchTransformed)
.expectNext("1", "2", "3")
.expectComplete()
.verify(Duration.ofSeconds(10));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noStackOverflow() {
int n = 5000;
Flux<Integer> source = Flux.just(1);
Flux<Integer> result = source;
for (int i = 0; i < n; i++) {
result = result.mergeWith(source);
}
AssertSubscriber<Integer> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mergeWithNoStackoverflow() {
int n = 5000;
Flux<Integer> source = Flux.just(1);
Flux<Integer> result = source;
for (int i = 0; i < n; i++) {
result = result.mergeWith(source);
}
AssertSubscriber<Integer> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void dontBreakFluxArrayFlatmap() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, 2).flatMap(Flux::just).mergeWith(Flux.just(3))
.subscribe(ts);
ts.assertValues(1, 2, 3)
.assertNoError()
.assertComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noStackOverflow2() {
int n = 5000;
Flux<Integer> source = Flux.just(1, 2).flatMap(Flux::just);
Flux<Integer> add = Flux.just(3);
Flux<Integer> result = source;
for (int i = 0; i < n; i++) {
result = result.mergeWith(add);
}
AssertSubscriber<Integer> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noStackOverflow3() {
int n = 5000;
Flux<Flux<Integer>> source = Flux.just(Flux.just(1), Flux.just(2));
Flux<Flux<Integer>> add = Flux.just(Flux.just(3));
Flux<Flux<Integer>> result = source;
for (int i = 0; i < n; i++) {
result = result.mergeWith(add);
}
AssertSubscriber<Object> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: rsocket/rsocket-java
private static Flux<Long> input() {
Flux<Long> interval = Flux.interval(Duration.ofMillis(1)).onBackpressureDrop();
for (int i = 0; i < 10; i++) {
interval = interval.mergeWith(interval);
}
return interval;
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Mono<List<DomainSummary>> listAvailableDomains(CloudFoundryClient cloudFoundryClient, String organizationId) {
return requestListPrivateDomains(cloudFoundryClient, organizationId)
.map(DefaultApplications::toDomain)
.mergeWith(requestListSharedDomains(cloudFoundryClient)
.map(DefaultApplications::toDomain))
.collectList();
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
@Override
public Flux<Domain> list() {
return this.cloudFoundryClient
.flatMapMany(cloudFoundryClient -> requestListPrivateDomains(cloudFoundryClient)
.map(DefaultDomains::toDomain)
.mergeWith(requestListSharedDomains(cloudFoundryClient)
.map(DefaultDomains::toDomain)))
.transform(OperationsLogging.log("List Domains"))
.checkpoint();
}
代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations
private static Mono<List<String>> getDomainNames(CloudFoundryClient cloudFoundryClient, String organizationId) {
return requestListPrivateDomains(cloudFoundryClient, organizationId)
.map(resource -> resource.getEntity().getName())
.mergeWith(requestListSharedDomains(cloudFoundryClient)
.map(resource -> resource.getEntity().getName()))
.collectList();
}
代码示例来源:origin: mohitsinha/tutorials
@Test
public void interleave() {
Flux<Long> delay = Flux.interval(Duration.ofMillis(5));
Flux<String> alphabetsWithDelay = Flux.just("A", "B").zipWith(delay, (s, l) -> s);
Flux<String> alphabetsWithoutDelay = Flux.just("C", "D");
Flux<String> interleavedFlux = alphabetsWithDelay.mergeWith(alphabetsWithoutDelay);
StepVerifier.create(interleavedFlux).expectNext("C", "D", "A", "B").verifyComplete();
Flux<String> nonInterleavedFlux = alphabetsWithDelay.concatWith(alphabetsWithoutDelay);
StepVerifier.create(nonInterleavedFlux).expectNext("A", "B", "C", "D").verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!