本文整理了Java中reactor.core.publisher.Flux.next()
方法的一些代码示例,展示了Flux.next()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.next()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:next
[英]Emit only the first item emitted by this Flux, into a new Mono.
[中]仅将此通量发出的第一项发射到新的单声道中。
代码示例来源:origin: spring-projects/spring-framework
private Mono<Resource> getResource(String resourcePath, List<? extends Resource> locations) {
return Flux.fromIterable(locations)
.concatMap(location -> getResource(resourcePath, location))
.next();
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<WebSession> retrieveSession(ServerWebExchange exchange) {
return Flux.fromIterable(getSessionIdResolver().resolveSessionIds(exchange))
.concatMap(this.sessionStore::retrieveSession)
.next();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
return this.httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
.uri(uri.toString())
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
.next();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
return sendRequest(getRequest, RequestCreator.get(), GetResponse.class, headers) //
.filter(GetResponse::isExists) //
.map(DefaultReactiveElasticsearchClient::getResponseToGetResult) //
.next();
}
代码示例来源:origin: spring-projects/spring-data-examples
public Mono<Integer> run(Integer id) {
return template.inTransaction().execute(action -> {
return lookup(id) //
.flatMap(process -> start(action, process)) //
.flatMap(it -> verify(it)) //
.flatMap(process -> finish(action, process));
}).next().map(Process::getId);
}
代码示例来源:origin: spring-projects/spring-security
public Mono<Authentication> authenticate(Authentication authentication) {
return Flux.fromIterable(this.delegates)
.concatMap(m -> m.authenticate(authentication))
.next();
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
protected Mono<Void> writeToInternal(ServerWebExchange exchange, Context context) {
MediaType contentType = exchange.getResponse().getHeaders().getContentType();
Locale locale = LocaleContextHolder.getLocale(exchange.getLocaleContext());
Stream<ViewResolver> viewResolverStream = context.viewResolvers().stream();
return Flux.fromStream(viewResolverStream)
.concatMap(viewResolver -> viewResolver.resolveViewName(name(), locale))
.next()
.switchIfEmpty(Mono.error(() ->
new IllegalArgumentException("Could not resolve view with name '" + name() + "'")))
.flatMap(view -> {
List<MediaType> mediaTypes = view.getSupportedMediaTypes();
return view.render(model(),
contentType == null && !mediaTypes.isEmpty() ? mediaTypes.get(0) : contentType,
exchange);
});
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<MainResponse> info(HttpHeaders headers) {
return sendRequest(new MainRequest(), RequestCreator.info(), MainResponse.class, headers) //
.next();
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
return sendRequest(getRequest, RequestCreator.exists(), RawActionResponse.class, headers) //
.map(response -> response.statusCode().is2xxSuccessful()) //
.next();
}
代码示例来源:origin: org.springframework/spring-web
private Mono<WebSession> retrieveSession(ServerWebExchange exchange) {
return Flux.fromIterable(getSessionIdResolver().resolveSessionIds(exchange))
.concatMap(this.sessionStore::retrieveSession)
.next();
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<Boolean> ping(HttpHeaders headers) {
return sendRequest(new MainRequest(), RequestCreator.ping(), RawActionResponse.class, headers) //
.map(response -> response.statusCode().is2xxSuccessful()) //
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session);
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
return this.httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
.uri(uri.toString())
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
.next();
}
代码示例来源:origin: spring-projects/spring-framework
return parts.next().cast(Object.class);
.next().cast(Object.class);
代码示例来源:origin: spring-projects/spring-framework
@Test // SPR-14992
public void writeAndAutoFlushBeforeComplete() {
Mono<String> result = this.webClient.get()
.uri("/write-and-never-complete")
.retrieve()
.bodyToFlux(String.class)
.next();
StepVerifier.create(result)
.expectNextMatches(s -> s.startsWith("0123456789"))
.expectComplete()
.verify(Duration.ofSeconds(10L));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleMergeMonoTest() throws Exception {
CountDownLatch latch = new CountDownLatch(2);
Flux<Integer> p = Flux.merge(Flux.<Integer>empty().next(), Mono.just(1))
.log("mono");
awaitLatch(p, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void zipOfNull() {
try {
Flux<String> as = Flux.just("x");
Flux<String> bs = Flux.just((String)null);
assertNull(Flux.zip(as, bs).next().block());
}
catch (NullPointerException npe) {
return;
}
assertFalse("Should have failed", true);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void monoNext() {
StepVerifier.create(Flux.just(1, 2, 3).next())
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancel() {
TestPublisher<String> cancelTester = TestPublisher.create();
MonoProcessor<String> processor = cancelTester.flux()
.next()
.toProcessor();
processor.subscribe();
processor.cancel();
cancelTester.assertCancelled();
}
内容来源于网络,如有侵权,请联系作者删除!