reactor.core.publisher.Flux.elapsed()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.8k)|赞(0)|评价(0)|浏览(317)

本文整理了Java中reactor.core.publisher.Flux.elapsed()方法的一些代码示例,展示了Flux.elapsed()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.elapsed()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:elapsed

Flux.elapsed介绍

[英]Map this Flux into reactor.util.function.Tuple2of timemillis and source data. The timemillis corresponds to the elapsed time between each signal as measured by the Schedulers#parallel() scheduler. First duration is measured between the subscription and the first element.
[中]将这个通量映射到反应堆中。util。作用timemillis和源数据的元组2。timemillis对应于调度器#parallel()调度器测量的每个信号之间经过的时间。第一个持续时间是在订阅和第一个元素之间测量的。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Map this {@link Flux} into {@link reactor.util.function.Tuple2 Tuple2<Long, T>}
 * of timemillis and source data. The timemillis corresponds to the elapsed time
 * between each signal as measured by the {@link Schedulers#parallel() parallel} scheduler.
 * First duration is measured between the subscription and the first element.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/elapsedForFlux.svg" alt="">
 *
 * @return a new {@link Flux} that emits a tuple of time elapsed in milliseconds and matching data
 */
public final Flux<Tuple2<Long, T>> elapsed() {
  return elapsed(Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

Flux<Tuple2<Long, String>> scenario_aFluxCanBeBenchmarked(){
  return Flux.just("test")
        .elapsed();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxRetryRandomBackoff_noRandomness() {
  Exception exception = new IOException("boom retry");
  List<Long> elapsedList = new ArrayList<>();
  StepVerifier.withVirtualTime(() ->
      Flux.concat(Flux.range(0, 2), Flux.error(exception))
        .retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0)
        .elapsed()
        .doOnNext(elapsed -> { if (elapsed.getT2() == 0) elapsedList.add(elapsed.getT1());} )
        .map(Tuple2::getT2)
  )
        .thenAwait(Duration.ofMinutes(1)) //ensure whatever the jittered delay that we have time to fit 4 retries
        .expectNext(0, 1) //normal output
        .expectNext(0, 1, 0, 1, 0, 1, 0, 1) //4 retry attempts
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Retries exhausted: 4/4")
                            .hasCause(exception))
        .verify(Duration.ofSeconds(1)); //vts test shouldn't even take that long
  assertThat(elapsedList).containsExactly(0L, 100L, 200L, 400L, 800L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  Flux<Tuple2<Long, Integer>> test = Flux.just(1).elapsed(Schedulers.single());
  assertThat(test).isInstanceOf(Scannable.class);
  assertThat(((Scannable) test).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.single());
}

代码示例来源:origin: reactor/reactor-core

Mono<Long> scenario_fluxItemCanBeShiftedByTime() {
  return Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(150))
        .elapsed()
        .take(10)
        .reduce(0L,
            (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) :
                next.getT1());
}

代码示例来源:origin: reactor/reactor-core

Mono<Long> scenario_fluxItemCanBeShiftedByTime2() {
  return Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(150))
        .elapsed()
        .take(10)
        .reduce(0L,
            (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) :
                next.getT1());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void delayFirstInterval() {
  Supplier<Flux<Tuple2<Long, Long>>> test = () -> Flux.interval(Duration.ofMillis(50))
                          .delaySequence(Duration.ofMillis(500))
                          .elapsed()
                            .take(33);
  StepVerifier.withVirtualTime(test)
        .thenAwait(Duration.ofMillis(500 + 50))
        .recordWith(ArrayList::new)
        .assertNext(t2 -> assertThat(t2.getT1()).isEqualTo(550))
        .thenAwait(Duration.ofMillis(33 * 50))
        .thenConsumeWhile(t2 -> t2.getT1() == 50)
        .consumeRecordedWith(record -> {
          assertThat(record.stream().mapToLong(Tuple2::getT2))
               .startsWith(0L, 1L, 2L)
               .endsWith(30L, 31L, 32L)
               .isSorted()
               .hasSize(33);
        })
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Ignore("delayElements test for local comparison run")
@Test
public void delayElements() {
  Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
                    .onBackpressureDrop()
                    .delayElements(Duration.ofMillis(500))
                    .take(33)
                    .elapsed()
                    .log();
  StepVerifier.create(test)
        .thenConsumeWhile(t2 -> t2.getT1() >= 500)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorHandlingIntervalMillisRetried() throws InterruptedException {
  VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
  VirtualTimeScheduler.set(virtualTimeScheduler);
  Flux<Tuple2<Long,String>> flux =
  Flux.interval(Duration.ofMillis(250))
    .map(input -> {
      if (input < 3) return "tick " + input;
      throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed(); // <1>
  flux.subscribe(System.out::println, System.err::println); // <2>
  //Thread.sleep(2100); // <3>
  virtualTimeScheduler.advanceTimeBy(Duration.ofHours(1));
  StepVerifier.withVirtualTime(() -> flux, () -> virtualTimeScheduler, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT2().equals("tick 0"))
        .expectNextMatches(t -> t.getT2().equals("tick 1"))
        .expectNextMatches(t -> t.getT2().equals("tick 2"))
        .expectNextMatches(t -> t.getT2().equals("tick 0"))
        .expectNextMatches(t -> t.getT2().equals("tick 1"))
        .expectNextMatches(t -> t.getT2().equals("tick 2"))
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void analyticsTest() throws Exception {
  ReplayProcessor<Integer> source = ReplayProcessor.create();
  long avgTime = 50l;
  Mono<Long> result = source
      .log("delay")
      .publishOn(asyncGroup)
               .delayElements(Duration.ofMillis(avgTime))
               .elapsed()
               .skip(1)
               .groupBy(w -> w.getT1())
               .flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
               .log("elapsed")
               .collectSortedList(Comparator.comparing(Tuple2::getT1))
               .flatMapMany(Flux::fromIterable)
               .reduce(-1L, (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) : next.getT1())
               .log("reduced-elapsed")
               .cache();
  source.subscribe();
  for (int j = 0; j < 10; j++) {
    source.onNext(1);
  }
  source.onComplete();
  Assert.assertTrue(result.block(Duration.ofSeconds(5)) >= avgTime * 0.6);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxHistoryTTL() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(2, Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTL() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000)
                           , vts)
                       .cache(Duration.ofMillis(2000), vts)
                       .elapsed(vts);
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxHistoryTTL() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis
                           (1000), vts)
                       .cache(2, Duration.ofMillis(2000), vts)
                       .elapsed(vts);
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTL() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTLMillis() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(Duration.ofMillis(2000), vts)
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTLFused() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxHistoryTTLFused() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(2, Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed()
      .log();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFlux() {
  VirtualTimeScheduler vts = VirtualTimeScheduler.create();
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000)
                           , vts)
                       .cache()
                       .elapsed(vts);
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
        .thenAwait(Duration.ofSeconds(3))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFlux() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay()
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxFused() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay()
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

相关文章

Flux类方法