reactor.core.publisher.Flux.concatWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(668)

本文整理了Java中reactor.core.publisher.Flux.concatWith()方法的一些代码示例,展示了Flux.concatWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatWith()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatWith

Flux.concatWith介绍

[英]Concatenate emissions of this Flux with the provided Publisher (no interleave).
[中]将此流量的发射与提供的发布服务器连接(无交错)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  if (this.body != null) {
    return Mono.error(new IllegalStateException("Multiple calls to writeWith() not supported"));
  }
  this.body = Flux.just(generateHeaders()).concatWith(body);
  // We don't actually want to write (just save the body Flux)
  return Mono.empty();
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  if (this.body != null) {
    return Mono.error(new IllegalStateException("Multiple calls to writeWith() not supported"));
  }
  this.body = Flux.just(generateHeaders()).concatWith(body);
  // We don't actually want to write (just save the body Flux)
  return Mono.empty();
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> writeMultipart(
    MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
  byte[] boundary = generateMultipartBoundary();
  Map<String, String> params = new HashMap<>(2);
  params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
  params.put("charset", getCharset().name());
  outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
  LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
      (isEnableLoggingRequestDetails() ?
          LogFormatUtils.formatValue(map, !traceOn) :
          "parts " + map.keySet() + " (content masked)"));
  Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
      .concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue()))
      .concatWith(Mono.just(generateLastLine(boundary)));
  return outputMessage.writeWith(body);
}

代码示例来源:origin: spring-projects/spring-framework

@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> encodeData(@Nullable T data, ResolvableType valueType,
    MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
  if (data == null) {
    return Flux.empty();
  }
  if (data instanceof String) {
    String text = (String) data;
    return Flux.from(encodeText(StringUtils.replace(text, "\n", "\ndata:") + "\n", mediaType, factory));
  }
  if (this.encoder == null) {
    return Flux.error(new CodecException("No SSE encoder configured and the data is not String."));
  }
  return ((Encoder<T>) this.encoder)
      .encode(Mono.just(data), factory, valueType, mediaType, hints)
      .concatWith(encodeText("\n", mediaType, factory));
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = this.getEventStore()
              .findAll()
              .concatWith(this.getEventStore())
              .concatMap(this::updateSnapshot)
              .subscribe();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void skipUntilByteCountErrorInFlux() {
  DataBuffer foo = stringBuffer("foo");
  Flux<DataBuffer> flux =
      Flux.just(foo).concatWith(Mono.error(new RuntimeException()));
  Flux<DataBuffer> result = DataBufferUtils.skipUntilByteCount(flux, 3L);
  StepVerifier.create(result)
      .expectError(RuntimeException.class)
      .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: spring-projects/spring-framework

.concatWith(getRegionSuffix(bufferFactory, boundaryString));

代码示例来源:origin: spring-projects/spring-framework

@Test
public void errorInStream() {
  DataBuffer buffer = stringBuffer("{\"id\":1,\"name\":");
  Flux<DataBuffer> source = Flux.just(buffer)
      .concatWith(Flux.error(new RuntimeException()));
  Flux<TokenBuffer> result = Jackson2Tokenizer.tokenize(source, this.jsonFactory, true);
  StepVerifier.create(result)
      .expectError(RuntimeException.class)
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeWritableByteChannelErrorInFlux() throws Exception {
  DataBuffer foo = stringBuffer("foo");
  DataBuffer bar = stringBuffer("bar");
  Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
  WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
  Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
  StepVerifier.create(writeResult)
      .consumeNextWith(stringConsumer("foo"))
      .consumeNextWith(stringConsumer("bar"))
      .expectError()
      .verify(Duration.ofSeconds(5));
  String result = String.join("", Files.readAllLines(tempFile));
  assertEquals("foobar", result);
  channel.close();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readError() {
  Flux<DataBuffer> body =
      Flux.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n"))
          .concatWith(Flux.error(new RuntimeException()));
  MockServerHttpRequest request = MockServerHttpRequest.post("/")
      .body(body);
  Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
      request, Collections.emptyMap()).cast(String.class);
  StepVerifier.create(data)
      .expectNextMatches(elem -> elem.equals("foo\nbar"))
      .expectNextMatches(elem -> elem.equals("baz"))
      .expectError()
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readFormError() {
  DataBuffer fooBuffer = stringBuffer("name=value");
  Flux<DataBuffer> body =
      Flux.just(fooBuffer).concatWith(Flux.error(new RuntimeException()));
  MockServerHttpRequest request = request(body);
  Flux<MultiValueMap<String, String>> result = this.reader.read(null, request, null);
  StepVerifier.create(result)
      .expectError()
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void joinErrors() {
  DataBuffer foo = stringBuffer("foo");
  DataBuffer bar = stringBuffer("bar");
  Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
  Mono<DataBuffer> result = DataBufferUtils.join(flux);
  StepVerifier.create(result)
      .expectError(RuntimeException.class)
      .verify();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
  DataBuffer foo = stringBuffer("foo");
  DataBuffer bar = stringBuffer("bar");
  Flux<DataBuffer> flux =
      Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
  AsynchronousFileChannel channel =
      AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
  Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
  StepVerifier.create(writeResult)
      .consumeNextWith(stringConsumer("foo"))
      .consumeNextWith(stringConsumer("bar"))
      .expectError(RuntimeException.class)
      .verify();
  String result = String.join("", Files.readAllLines(tempFile));
  assertEquals("foobar", result);
  channel.close();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorMessage() {
  Flux<String> flux = Flux.just("foo")
              .concatWith(Mono.error(new IllegalArgumentException(
                  "Error message")));
  StepVerifier.create(flux)
        .expectNext("foo")
        .expectErrorMessage("Error message")
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  Flux<String> flux = Flux.just("foo")
              .concatWith(Mono.error(new IllegalArgumentException()));
  StepVerifier.create(flux)
        .expectNext("foo")
        .expectError()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnError() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnErrorSkip() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .buffer(4, 5))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayError2() {
  StepVerifier.create(Flux.just(Flux.just(1, 2)
                   .concatWith(Flux.error(new Exception("test"))),
      Flux.just(3, 4))
              .concatMap(f -> f))
        .expectNext(1, 2)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEndError() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2)
                              .concatWith(Flux.error(new Exception(
                                  "test"))),
      Flux.just(3, 4)), false, 128))
        .expectNext(1, 2)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEndError2() {
  StepVerifier.create(Flux.concatDelayError(Flux.just(Flux.just(1, 2)
                              .concatWith(Flux.error(new Exception(
                                  "test"))),
      Flux.just(3, 4)), true, 128))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

相关文章

Flux类方法