本文整理了Java中reactor.core.publisher.Flux.concatWith()
方法的一些代码示例,展示了Flux.concatWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatWith()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatWith
[英]Concatenate emissions of this Flux with the provided Publisher (no interleave).
[中]将此流量的发射与提供的发布服务器连接(无交错)。
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (this.body != null) {
return Mono.error(new IllegalStateException("Multiple calls to writeWith() not supported"));
}
this.body = Flux.just(generateHeaders()).concatWith(body);
// We don't actually want to write (just save the body Flux)
return Mono.empty();
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (this.body != null) {
return Mono.error(new IllegalStateException("Multiple calls to writeWith() not supported"));
}
this.body = Flux.just(generateHeaders()).concatWith(body);
// We don't actually want to write (just save the body Flux)
return Mono.empty();
}
代码示例来源:origin: spring-projects/spring-framework
private Mono<Void> writeMultipart(
MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
byte[] boundary = generateMultipartBoundary();
Map<String, String> params = new HashMap<>(2);
params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
params.put("charset", getCharset().name());
outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue()))
.concatWith(Mono.just(generateLastLine(boundary)));
return outputMessage.writeWith(body);
}
代码示例来源:origin: spring-projects/spring-framework
@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> encodeData(@Nullable T data, ResolvableType valueType,
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
if (data == null) {
return Flux.empty();
}
if (data instanceof String) {
String text = (String) data;
return Flux.from(encodeText(StringUtils.replace(text, "\n", "\ndata:") + "\n", mediaType, factory));
}
if (this.encoder == null) {
return Flux.error(new CodecException("No SSE encoder configured and the data is not String."));
}
return ((Encoder<T>) this.encoder)
.encode(Mono.just(data), factory, valueType, mediaType, hints)
.concatWith(encodeText("\n", mediaType, factory));
}
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
this.subscription = this.getEventStore()
.findAll()
.concatWith(this.getEventStore())
.concatMap(this::updateSnapshot)
.subscribe();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void skipUntilByteCountErrorInFlux() {
DataBuffer foo = stringBuffer("foo");
Flux<DataBuffer> flux =
Flux.just(foo).concatWith(Mono.error(new RuntimeException()));
Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(flux, 3L);
StepVerifier.create(result)
.expectError(RuntimeException.class)
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: spring-projects/spring-framework
.concatWith(getRegionSuffix(bufferFactory, boundaryString));
代码示例来源:origin: spring-projects/spring-framework
@Test
public void errorInStream() {
DataBuffer buffer = stringBuffer("{\"id\":1,\"name\":");
Flux<DataBuffer> source = Flux.just(buffer)
.concatWith(Flux.error(new RuntimeException()));
Flux<TokenBuffer> result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, true);
StepVerifier.create(result)
.expectError(RuntimeException.class)
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void writeWritableByteChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError()
.verify(Duration.ofSeconds(5));
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readError() {
Flux<DataBuffer> body =
Flux.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n"))
.concatWith(Flux.error(new RuntimeException()));
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(body);
Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
request, Collections.emptyMap()).cast(String.class);
StepVerifier.create(data)
.expectNextMatches(elem -> elem.equals("foo\nbar"))
.expectNextMatches(elem -> elem.equals("baz"))
.expectError()
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void readFormError() {
DataBuffer fooBuffer = stringBuffer("name=value");
Flux<DataBuffer> body =
Flux.just(fooBuffer).concatWith(Flux.error(new RuntimeException()));
MockServerHttpRequest request = request(body);
Flux<MultiValueMap<String, String>> result = this.reader.read(null, request, null);
StepVerifier.create(result)
.expectError()
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void joinErrors() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
Mono<DataBuffer> result = DataBufferUtils.join(flux);
StepVerifier.create(result)
.expectError(RuntimeException.class)
.verify();
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorMessage() {
Flux<String> flux = Flux.just("foo")
.concatWith(Mono.error(new IllegalArgumentException(
"Error message")));
StepVerifier.create(flux)
.expectNext("foo")
.expectErrorMessage("Error message")
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
Flux<String> flux = Flux.just("foo")
.concatWith(Mono.error(new IllegalArgumentException()));
StepVerifier.create(flux)
.expectNext("foo")
.expectError()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnErrorSkip() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.buffer(4, 5))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publisherOfPublisherDelayError2() {
StepVerifier.create(Flux.just(Flux.just(1, 2)
.concatWith(Flux.error(new Exception("test"))),
Flux.just(3, 4))
.concatMap(f -> f))
.expectNext(1, 2)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publisherOfPublisherDelayEndError() {
StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2)
.concatWith(Flux.error(new Exception(
"test"))),
Flux.just(3, 4)), false, 128))
.expectNext(1, 2)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publisherOfPublisherDelayEndError2() {
StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2)
.concatWith(Flux.error(new Exception(
"test"))),
Flux.just(3, 4)), true, 128))
.expectNext(1, 2, 3, 4)
.verifyErrorMessage("test");
}
内容来源于网络,如有侵权,请联系作者删除!