reactor.core.publisher.Flux.bufferWhen()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.5k)|赞(0)|评价(0)|浏览(208)

本文整理了Java中reactor.core.publisher.Flux.bufferWhen()方法的一些代码示例,展示了Flux.bufferWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.bufferWhen()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:bufferWhen

Flux.bufferWhen介绍

[英]Collect incoming values into multiple List buffers started each time an opening companion Publisher emits. Each buffer will last until the corresponding closing companion Publisher emits, thus releasing the buffer to the resulting Flux.

When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).

When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).

When Open signal is exactly coordinated with Close signal : exact buffers
[中]将传入值收集到多个列表缓冲区中,这些缓冲区在每次打开的伴随发布服务器发出时启动。每个缓冲区将持续到相应的关闭伙伴发布器发出,从而将缓冲区释放到产生的流量。
当打开信号严格不重叠关闭信号时:下降缓冲器(见下图中的绿色大理石)。
当打开信号比关闭信号更频繁时:重叠缓冲器(见下图中的第二和第三缓冲器)。
当打开信号与关闭信号完全协调时:精确缓冲器

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Collect incoming values into multiple {@link List} buffers started each time an opening
 * companion {@link Publisher} emits. Each buffer will last until the corresponding
 * closing companion {@link Publisher} emits, thus releasing the buffer to the resulting {@link Flux}.
 * <p>
 * When Open signal is strictly not overlapping Close signal : dropping buffers (see green marbles in diagram below).
 * <p>
 * When Open signal is strictly more frequent than Close signal : overlapping buffers (see second and third buffers in diagram below).
 * <p>
 * When Open signal is exactly coordinated with Close signal : exact buffers
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferWhen.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 * It DOES NOT provide strong guarantees in the case of overlapping buffers, as elements
 * might get discarded too early (from the first of two overlapping buffers for instance).
 *
 * @param bucketOpening a companion {@link Publisher} to subscribe for buffer creation signals.
 * @param closeSelector a factory that, given a buffer opening signal, returns a companion
 * {@link Publisher} to subscribe to for buffer closure and emission signals.
 * @param <U> the element type of the buffer-opening sequence
 * @param <V> the element type of the buffer-closing sequence
 *
 * @return a microbatched {@link Flux} of {@link List} delimited by an opening {@link Publisher} and a relative
 * closing {@link Publisher}
 */
public final <U, V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening,
    Function<? super U, ? extends Publisher<V>> closeSelector) {
  return bufferWhen(bucketOpening, closeSelector, listSupplier());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void immediateOpen() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .bufferWhen(Mono.just("OPEN"), u -> Mono.delay(Duration.ofMillis(100)))
              .flatMapIterable(Function.identity()))
        .expectNext(1, 2, 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferedCanCompleteIfOpenNeverCompletesOverlapping() {
  //this test ensures that overlapping buffers will complete if the source is exhausted before the open publisher finishes
  Mono<Integer> buffered = Flux.range(1, 200)
                 .delayElements(Duration.ofMillis(25))
                 .bufferWhen(Flux.interval(Duration.ZERO, Duration.ofMillis(100)),
                     open -> Mono.delay(Duration.ofMillis(200)))
                 .log(LOGGER, Level.FINE, false)
                 .reduce(new HashSet<Integer>(), (set, buffer) -> { set.addAll(buffer); return set;})
                 .map(HashSet::size);
  StepVerifier.create(buffered)
        .expectNext(200)
        .expectComplete()
        .verify(Duration.ofSeconds(10));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferedCanCompleteIfOpenNeverCompletesDropping() {
  //this test ensures that dropping buffers will complete if the source is exhausted before the open publisher finishes
  Mono<Integer> buffered = Flux.range(1, 200)
                 .delayElements(Duration.ofMillis(25))
                 .bufferWhen(Flux.interval(Duration.ZERO, Duration.ofMillis(200)),
                     open -> Mono.delay(Duration.ofMillis(100)))
                 .log(LOGGER, Level.FINE, false)
                 .reduce(new HashSet<Integer>(), (set, buffer) -> { set.addAll(buffer); return set;})
                 .map(HashSet::size);
  StepVerifier.create(buffered)
        .assertNext(size -> assertThat(size).as("approximate size with drops").isBetween(80, 110))
        .expectComplete()
        .verify(Duration.ofSeconds(10));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnOpenError() {
  StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ZERO, Duration.ofMillis(100)) // 0, 1, 2
                      .map(Long::intValue)
                      .take(3)
                      .bufferWhen(Flux.interval(Duration.ZERO, Duration.ofMillis(100)),
                          u -> (u == 2) ? null : Mono.never()))
        .thenAwait(Duration.ofSeconds(2))
        .expectErrorMessage("The bufferClose returned a null Publisher")
        .verifyThenAssertThat()
        .hasDiscardedExactly(0, 1, 1);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void discardOnBoundaryError() {
    StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ZERO, Duration.ofMillis(100)) // 0, 1, 2
                        .map(Long::intValue)
                        .take(3)
                        .bufferWhen(Flux.interval(Duration.ZERO, Duration.ofMillis(100)),
                            u -> (u == 2) ? Mono.error(new IllegalStateException("boom"))
                                : Mono.never()))
          .thenAwait(Duration.ofSeconds(2))
          .expectErrorMessage("boom")
          .verifyThenAssertThat()
          .hasDiscardedExactly(0, 1, 1);

  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseMainError() {
  StepVerifier.create(Flux.error(new IllegalStateException("boom"))
      .bufferWhen(Flux.never(), a -> Flux.never())
  )
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnError() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnNextWhenNoBuffers() {
  StepVerifier.create(Flux.just(1, 2, 3)
              //buffer don't open in time
              .bufferWhen(Mono.delay(Duration.ofSeconds(2)), u -> Mono.never()))
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .bufferWhen(Flux.just(1), u -> Mono.never()))
      .thenAwait(Duration.ofMillis(100))
      .thenCancel()
      .verifyThenAssertThat()
      .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseErrorBackpressure() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> open = TestPublisher.create();
  TestPublisher<Integer> close = TestPublisher.create();
  StepVerifier.create(source.flux()
               .bufferWhen(open, o -> close), 0)
        .then(() -> {
          source.error(new IllegalStateException("boom"));
          open.assertNoSubscribers();
          close.assertNoSubscribers();
        })
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseBadOpen() {
  TestPublisher<Object> badOpen = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(Flux.never()
              .bufferWhen(badOpen, o -> Flux.never()))
        .then(() -> {
          badOpen.error(new IOException("ioboom"));
          badOpen.complete();
          badOpen.next(1);
          badOpen.error(new IllegalStateException("boom"));
        })
        .expectErrorMessage("ioboom")
        .verifyThenAssertThat()
        .hasNotDroppedElements()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseBadSource() {
  TestPublisher<Object> badSource =
      TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(badSource.flux()
      .bufferWhen(Flux.never(), a -> Flux.never()))
        .then(() -> {
          badSource.error(new IOException("ioboom"));
          badSource.complete();
          badSource.next(1);
          badSource.error(new IllegalStateException("boom"));
        })
        .expectErrorMessage("ioboom")
        .verifyThenAssertThat()
        .hasNotDroppedElements()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseBadClose() {
  TestPublisher<Object> badClose = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(Flux.never()
              .bufferWhen(Flux.just(1).concatWith(Flux.never()), o -> badClose)
  )
        .then(() -> {
          badClose.error(new IOException("ioboom"));
          badClose.complete();
          badClose.next(1);
          badClose.error(new IllegalStateException("boom"));
        })
        .expectErrorMessage("ioboom")
        .verifyThenAssertThat()
        .hasNotDroppedElements()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void openCloseEmptyBackpressure() {
    TestPublisher<Integer> source = TestPublisher.create();
    TestPublisher<Integer> open = TestPublisher.create();
    TestPublisher<Integer> close = TestPublisher.create();

    StepVerifier.create(source.flux()
        .bufferWhen(open, o -> close), 0)
          .then(() -> {
            source.complete();
            open.assertNoSubscribers();
            close.assertNoSubscribers();
          })
          .verifyComplete();
//        ts.assertResult();
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseLimit() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> open = TestPublisher.create();
  TestPublisher<Integer> close = TestPublisher.create();
  StepVerifier.create(source.flux()
               .bufferWhen(open, o -> close)
               .limitRequest(1))
        .then(() -> {
          open.next(1);
          close.complete();
        })
        .then(() -> {
          source.assertNoSubscribers();
          open.assertNoSubscribers();
          close.assertNoSubscribers();
        })
        .expectNextMatches(List::isEmpty)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseTake() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> open = TestPublisher.create();
  TestPublisher<Integer> close = TestPublisher.create();
  StepVerifier.create(source.flux()
               .bufferWhen(open, o -> close)
               .take(1), 2)
        .then(() -> {
          open.next(1);
          close.complete();
        })
        .then(() -> {
          source.assertNoSubscribers();
          open.assertNoSubscribers();
          close.assertNoSubscribers();
        })
        .expectNextMatches(List::isEmpty)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseOpenCompletes() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> open = TestPublisher.create();
  TestPublisher<Integer> close = TestPublisher.create();
  StepVerifier.create(source.flux()
               .bufferWhen(open, o -> close)
  )
        .then(() -> {
          open.next(1);
          close.assertSubscribers();
        })
        .then(() -> {
          open.complete();
          source.assertSubscribers();
          close.assertSubscribers();
        })
        .then(() -> {
          close.complete();
          source.assertNoSubscribers();
        })
        .expectNextMatches(List::isEmpty)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseOpenCompletesNoBuffers() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> open = TestPublisher.create();
  TestPublisher<Integer> close = TestPublisher.create();
  StepVerifier.create(source.flux()
               .bufferWhen(open, o -> close))
        .then(() -> {
          open.next(1);
          close.assertSubscribers();
        })
        .then(() -> {
          close.complete();
          source.assertSubscribers();
          open.assertSubscribers();
        })
        .then(() -> {
          open.complete();
          source.assertNoSubscribers();
        })
        .expectNextMatches(List::isEmpty)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void openCloseDisposedOnComplete() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> open = TestPublisher.create();
  TestPublisher<Integer> close = TestPublisher.create();
  StepVerifier.create(source
      .flux()
      .bufferWhen(open, o -> close))
        .then(() -> {
          source.assertSubscribers();
          open.assertSubscribers();
          close.assertNoSubscribers();
          open.next(1);
        })
        .then(() -> {
          open.assertSubscribers();
          close.assertSubscribers();
          source.complete();
        })
        .expectNextMatches(List::isEmpty)
        .verifyComplete();
  open.assertNoSubscribers();
  close.assertNoSubscribers();
}

相关文章

Flux类方法