reactor.core.publisher.Flux.windowTimeout()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(300)

本文整理了Java中reactor.core.publisher.Flux.windowTimeout()方法的一些代码示例,展示了Flux.windowTimeout()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.windowTimeout()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:windowTimeout

Flux.windowTimeout介绍

[英]Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete once it contains maxSize elements OR it has been open for the given Duration (as measured on the Schedulers#parallel()Scheduler).
[中]将此通量序列拆分为多个通量窗口,其中包含maxSize元素(对于最后一个窗口,小于此值),并从第一项开始。一旦每个通量窗口包含maxSize元素或在给定的持续时间内打开(在Schedulers#parallel()Scheduler上测量),它就会完成。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Split this {@link Flux} sequence into multiple {@link Flux} windows containing
 * {@code maxSize} elements (or less for the final window) and starting from the first item.
 * Each {@link Flux} window will onComplete once it contains {@code maxSize} elements
 * OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel}
 * Scheduler).
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/windowTimeout.svg" alt="">
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure
 * upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the maximum number of items to emit in the window before closing it
 * @param maxTime the maximum {@link Duration} since the window was opened before closing it
 *
 * @return a {@link Flux} of {@link Flux} windows based on element count and duration
 */
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime) {
  return windowTimeout(maxSize, maxTime , Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_windowWithTimeoutAccumulateOnTimeOrSize() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .windowTimeout(5, Duration.ofMillis(2000))
        .concatMap(Flux::buffer);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowTimeoutComplete() throws Exception {
  Flux<Flux<Integer>> windows = source.windowTimeout(5, Duration.ofMillis(200));
  subscribe(windows);
  generate(0, 3);
  Thread.sleep(300);
  generateAndComplete(3, 3);
  verifyMainComplete(Arrays.asList(0, 1, 2), Arrays.asList(3, 4, 5));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowTimeoutInnerCancel() throws Exception {
  Flux<Flux<Integer>> windows = source.windowTimeout(5, Duration.ofMillis(5000));
  subscribe(windows);
  generateWithCancel(0, 6, 1);
  verifyInnerCancel(0, i -> i != 2, Arrays.asList(0, 1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noDelayMultipleOfSize() {
  StepVerifier.create(Flux.range(1, 10)
              .windowTimeout(5, Duration.ofSeconds(1))
              .concatMap(Flux::collectList)
  )
        .assertNext(l -> assertThat(l).containsExactly(1, 2, 3, 4, 5))
        .assertNext(l -> assertThat(l).containsExactly(6, 7, 8, 9, 10))
        .assertNext(l -> assertThat(l).isEmpty())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowTimeoutMainCancelNoNewWindow() throws Exception {
  Flux<Flux<Integer>> windows = source.windowTimeout(5, Duration.ofMillis(200));
  subscribe(windows);
  generate(0, 1);
  Thread.sleep(300);
  generate(1, 1);
  mainSubscriber.cancel();
  Thread.sleep(300);
  generate(2, 1);
  verifyMainCancelNoNewWindow(1, Arrays.asList(0), Arrays.asList(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noDelayGreaterThanSize() {
  StepVerifier.create(Flux.range(1, 12)
              .windowTimeout(5, Duration.ofHours(1))
              .concatMap(Flux::collectList)
  )
        .assertNext(l -> assertThat(l).containsExactly(1, 2, 3, 4, 5))
        .assertNext(l -> assertThat(l).containsExactly(6, 7, 8, 9, 10))
        .assertNext(l -> assertThat(l).containsExactly(11, 12))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void longEmptyEmitsEmptyWindowsRegularly() {
  StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofMillis(350))
                      .ignoreElement()
                      .as(Flux::from)
                      .windowTimeout(1000, Duration.ofMillis(100))
                      .concatMap(Flux::collectList)
  )
        .thenAwait(Duration.ofMinutes(1))
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void longDelaysStartEndEmitEmptyWindows() {
  StepVerifier.withVirtualTime(() ->
    Mono.just("foo")
      .delayElement(Duration.ofMillis(400 + 400 + 300))
      .concatWith(Mono.delay(Duration.ofMillis(100 + 400 + 100)).then(Mono.empty()))
      .windowTimeout(1000, Duration.ofMillis(400))
      .concatMap(Flux::collectList)
  )
        .thenAwait(Duration.ofHours(1))
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).containsExactly("foo"))
        .assertNext(l -> assertThat(l).isEmpty())
        .assertNext(l -> assertThat(l).isEmpty()) //closing window
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowTimeoutMainCancel() throws Exception {
  Flux<Flux<Integer>> windows = source.windowTimeout(10, Duration.ofMillis(100));
  subscribe(windows);
  generate(0, 2);
  mainSubscriber.cancel();
  generate(2, 3);
  Thread.sleep(200);
  generate(5, 10);
  verifyMainCancel(true);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowWithTimeoutStartsTimerOnSubscription() {
  StepVerifier.withVirtualTime(() ->
      Mono.delay(Duration.ofMillis(300))
        .thenMany(Flux.range(1, 3))
        .delayElements(Duration.ofMillis(150))
        .concatWith(Flux.range(4, 10).delaySubscription(Duration.ofMillis(500)))
        .windowTimeout(10, Duration.ofMillis(500))
        .flatMap(Flux::collectList)
  )
        .expectSubscription()
        .thenAwait(Duration.ofSeconds(100))
        .assertNext(l -> assertThat(l).containsExactly(1))
        .assertNext(l -> assertThat(l).containsExactly(2, 3))
        .assertNext(l -> assertThat(l).containsExactly(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
        .assertNext(l -> assertThat(l).isEmpty())
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testIssue912() {
  StepVerifier.withVirtualTime(() -> Flux.concat(
      Flux.just("#").delayElements(Duration.ofMillis(20)),
      Flux.range(1, 10),
      Flux.range(11, 5).delayElements(Duration.ofMillis(15))
  )
    .windowTimeout(10, Duration.ofMillis(1)).concatMap(w -> w).log())
        .thenAwait(Duration.ofMillis(95))
        .expectNextCount(16)
  .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

.windowTimeout(2, Duration.ofSeconds(2), testScheduler)
.concatMap(w -> {
  reject.set(true);

代码示例来源:origin: reactor/reactor-core

@Test
public void rejectedOnSubscription() {
  Scheduler testScheduler = new Scheduler() {
    @Override
    public Disposable schedule(Runnable task) {
      throw Exceptions.failWithRejected();
    }
    @Override
    public Worker createWorker() {
      return new Worker() {
        @Override
        public Disposable schedule(Runnable task) {
          throw Exceptions.failWithRejected();
        }
        @Override
        public void dispose() {
        }
      };
    }
  };
  StepVerifier.create(Flux.range(1, 3).hide()
              .windowTimeout(10, Duration.ofMillis(500), testScheduler))
        .expectNextCount(1)
        .verifyError(RejectedExecutionException.class);
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Split this {@link Flux} sequence into multiple {@link Flux} windows containing
 * {@code maxSize} elements (or less for the final window) and starting from the first item.
 * Each {@link Flux} window will onComplete once it contains {@code maxSize} elements
 * OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel}
 * Scheduler).
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/windowTimeout.svg" alt="">
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure
 * upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the maximum number of items to emit in the window before closing it
 * @param maxTime the maximum {@link Duration} since the window was opened before closing it
 *
 * @return a {@link Flux} of {@link Flux} windows based on element count and duration
 */
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime) {
  return windowTimeout(maxSize, maxTime , Schedulers.parallel());
}

代码示例来源:origin: io.netifi.proteus/tracing-openzipkin

.windowTimeout(128, Duration.ofMillis(1000))
.map(
  strings ->

代码示例来源:origin: io.netifi.proteus/proteus-tracing-openzipkin

.windowTimeout(128, Duration.ofMillis(1000))
.map(
  strings ->

代码示例来源:origin: netifi-proteus/proteus-java

.windowTimeout(128, Duration.ofMillis(1000))
.map(
  strings ->

代码示例来源:origin: netifi-proteus/proteus-java

Publisher<SimpleRequest> messages, ByteBuf metadata) {
return Flux.from(messages)
  .windowTimeout(10, Duration.ofSeconds(500))
  .take(1)
  .flatMap(Function.identity())

代码示例来源:origin: akarnokd/akarnokd-misc

Flux.range(11, 5).delayElements(Duration.ofMillis(15))
.windowTimeout(10, Duration.ofMillis(1))
.subscribe(flx -> flx.subscribe(System.out::println));

相关文章

Flux类方法