reactor.core.publisher.Flux.bufferUntil()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(185)

本文整理了Java中reactor.core.publisher.Flux.bufferUntil()方法的一些代码示例,展示了Flux.bufferUntil()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.bufferUntil()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:bufferUntil

Flux.bufferUntil介绍

[英]Collect incoming values into multiple List buffers that will be emitted by the resulting Flux each time the given predicate returns true. Note that the element that triggers the predicate to return true (and thus closes a buffer) is included as last element in the emitted buffer.

On completion, if the latest buffer is non-empty and has not been closed it is emitted. However, such a "partial" buffer isn't emitted in case of onError termination.
[中]将传入值收集到多个列表缓冲区中,每次给定谓词返回true时,这些缓冲区将由结果流量发出。请注意,触发谓词返回true(从而关闭缓冲区)的元素作为发出的缓冲区中的最后一个元素包含。
完成时,如果最新缓冲区非空且尚未关闭,则会发出该缓冲区。但是,在onError终止的情况下,不会发出这样的“部分”缓冲区。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<Object> read(
    ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  boolean shouldWrap = isServerSentEvent(elementType);
  ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType);
  return stringDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
      .bufferUntil(line -> line.equals(""))
      .concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
}

代码示例来源:origin: org.springframework/spring-core

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Flux<Object> read(
    ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  boolean shouldWrap = isServerSentEvent(elementType);
  ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType);
  return stringDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
      .bufferUntil(line -> line.equals(""))
      .concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnPredicateError() {
  StepVerifier.create(Flux.just(1, 2, 3)
      .bufferUntil(i -> { if (i == 3) throw new IllegalStateException("boom"); else return false; }))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testBufferPredicateUntilIncludesBoundaryLastAfter() {
  String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
  Flux<List<String>> colors = Flux
      .fromArray(colorSeparated)
      .bufferUntil(val -> val.equals("#"), false)
      .log();
  StepVerifier.create(colors)
        .consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue", "#")))
        .consumeNextWith(l2 -> Assert.assertThat(l2, contains("green", "green", "#")))
        .consumeNextWith(l3 -> Assert.assertThat(l3, contains("blue", "cyan")))
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testBufferPredicateUntilIncludesBoundaryLast() {
  String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
  Flux<List<String>> colors = Flux
      .fromArray(colorSeparated)
      .bufferUntil(val -> val.equals("#"))
      .log();
  StepVerifier.create(colors)
        .consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue", "#")))
        .consumeNextWith(l2 -> Assert.assertThat(l2, contains("green", "green", "#")))
        .consumeNextWith(l3 -> Assert.assertThat(l3, contains("blue", "cyan")))
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void requestBufferDoesntOverflow() {
  LongAdder requestCallCount = new LongAdder();
  LongAdder totalRequest = new LongAdder();
  Flux<Integer> source = Flux.range(1, 10).hide()
                .doOnRequest(r -> requestCallCount.increment())
                .doOnRequest(totalRequest::add);
  StepVerifier.withVirtualTime(//start with a request for 1 buffer
      () -> source.bufferUntil(i -> i % 3 == 0), 1)
        .expectSubscription()
        .expectNext(Arrays.asList(1, 2, 3))
        .expectNoEvent(Duration.ofSeconds(1))
        .thenRequest(2)
        .expectNext(Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9))
        .expectNoEvent(Duration.ofSeconds(1))
        .thenRequest(3)
        .expectNext(Collections.singletonList(10))
        .expectComplete()
        .verify();
  assertThat(requestCallCount.intValue()).isEqualTo(11); //10 elements then the completion
  assertThat(totalRequest.longValue()).isEqualTo(11L); //ignores the main requests
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void discardOnError() {
    StepVerifier.create(Flux.just(1, 2, 3)
                .concatWith(Mono.error(new IllegalStateException("boom")))
                .bufferUntil(i -> i > 10))
          .expectErrorMessage("boom")
          .verifyThenAssertThat()
          .hasDiscardedExactly(1, 2, 3);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testBufferPredicateUntilCutBeforeIncludesBoundaryFirst() {
  String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
  Flux<List<String>> colors = Flux
      .fromArray(colorSeparated)
      .bufferUntil(val -> val.equals("#"), true)
      .log();
  StepVerifier.create(colors)
        .thenRequest(1)
        .consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue")))
        .consumeNextWith(l2 -> Assert.assertThat(l2, contains("#", "green", "green")))
        .consumeNextWith(l3 -> Assert.assertThat(l3, contains("#", "blue", "cyan")))
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .bufferUntil(i -> i > 10))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage message,
    Map<String, Object> hints) {
  boolean shouldWrap = isServerSentEvent(elementType);
  ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType);
  return stringDecoder.decode(message.getBody(), STRING_TYPE, null, Collections.emptyMap())
      .bufferUntil(line -> line.equals(""))
      .concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-web

@Override
public Flux<Object> read(
    ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  boolean shouldWrap = isServerSentEvent(elementType);
  ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType);
  return stringDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
      .bufferUntil(line -> line.equals(""))
      .concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
}

相关文章

Flux类方法