本文整理了Java中reactor.core.publisher.Flux.elapsed()
方法的一些代码示例,展示了Flux.elapsed()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.elapsed()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:elapsed
[英]Map this Flux into reactor.util.function.Tuple2of timemillis and source data. The timemillis corresponds to the elapsed time between each signal as measured by the Schedulers#parallel() scheduler. First duration is measured between the subscription and the first element.
[中]将这个通量映射到反应堆中。util。作用timemillis和源数据的元组2。timemillis对应于调度器#parallel()调度器测量的每个信号之间经过的时间。第一个持续时间是在订阅和第一个元素之间测量的。
代码示例来源:origin: reactor/reactor-core
/**
* Map this {@link Flux} into {@link reactor.util.function.Tuple2 Tuple2<Long, T>}
* of timemillis and source data. The timemillis corresponds to the elapsed time
* between each signal as measured by the {@link Schedulers#parallel() parallel} scheduler.
* First duration is measured between the subscription and the first element.
*
* <p>
* <img class="marble" src="doc-files/marbles/elapsedForFlux.svg" alt="">
*
* @return a new {@link Flux} that emits a tuple of time elapsed in milliseconds and matching data
*/
public final Flux<Tuple2<Long, T>> elapsed() {
return elapsed(Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
Flux<Tuple2<Long, String>> scenario_aFluxCanBeBenchmarked(){
return Flux.just("test")
.elapsed();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxRetryRandomBackoff_noRandomness() {
Exception exception = new IOException("boom retry");
List<Long> elapsedList = new ArrayList<>();
StepVerifier.withVirtualTime(() ->
Flux.concat(Flux.range(0, 2), Flux.error(exception))
.retryBackoff(4, Duration.ofMillis(100), Duration.ofMillis(2000), 0)
.elapsed()
.doOnNext(elapsed -> { if (elapsed.getT2() == 0) elapsedList.add(elapsed.getT1());} )
.map(Tuple2::getT2)
)
.thenAwait(Duration.ofMinutes(1)) //ensure whatever the jittered delay that we have time to fit 4 retries
.expectNext(0, 1) //normal output
.expectNext(0, 1, 0, 1, 0, 1, 0, 1) //4 retry attempts
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Retries exhausted: 4/4")
.hasCause(exception))
.verify(Duration.ofSeconds(1)); //vts test shouldn't even take that long
assertThat(elapsedList).containsExactly(0L, 100L, 200L, 400L, 800L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
Flux<Tuple2<Long, Integer>> test = Flux.just(1).elapsed(Schedulers.single());
assertThat(test).isInstanceOf(Scannable.class);
assertThat(((Scannable) test).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.single());
}
代码示例来源:origin: reactor/reactor-core
Mono<Long> scenario_fluxItemCanBeShiftedByTime() {
return Flux.range(0, 10000)
.delayElements(Duration.ofMillis(150))
.elapsed()
.take(10)
.reduce(0L,
(acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) :
next.getT1());
}
代码示例来源:origin: reactor/reactor-core
Mono<Long> scenario_fluxItemCanBeShiftedByTime2() {
return Flux.range(0, 10000)
.delayElements(Duration.ofMillis(150))
.elapsed()
.take(10)
.reduce(0L,
(acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) :
next.getT1());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void delayFirstInterval() {
Supplier<Flux<Tuple2<Long, Long>>> test = () -> Flux.interval(Duration.ofMillis(50))
.delaySequence(Duration.ofMillis(500))
.elapsed()
.take(33);
StepVerifier.withVirtualTime(test)
.thenAwait(Duration.ofMillis(500 + 50))
.recordWith(ArrayList::new)
.assertNext(t2 -> assertThat(t2.getT1()).isEqualTo(550))
.thenAwait(Duration.ofMillis(33 * 50))
.thenConsumeWhile(t2 -> t2.getT1() == 50)
.consumeRecordedWith(record -> {
assertThat(record.stream().mapToLong(Tuple2::getT2))
.startsWith(0L, 1L, 2L)
.endsWith(30L, 31L, 32L)
.isSorted()
.hasSize(33);
})
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Ignore("delayElements test for local comparison run")
@Test
public void delayElements() {
Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
.onBackpressureDrop()
.delayElements(Duration.ofMillis(500))
.take(33)
.elapsed()
.log();
StepVerifier.create(test)
.thenConsumeWhile(t2 -> t2.getT1() >= 500)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorHandlingIntervalMillisRetried() throws InterruptedException {
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
VirtualTimeScheduler.set(virtualTimeScheduler);
Flux<Tuple2<Long,String>> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed(); // <1>
flux.subscribe(System.out::println, System.err::println); // <2>
//Thread.sleep(2100); // <3>
virtualTimeScheduler.advanceTimeBy(Duration.ofHours(1));
StepVerifier.withVirtualTime(() -> flux, () -> virtualTimeScheduler, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT2().equals("tick 0"))
.expectNextMatches(t -> t.getT2().equals("tick 1"))
.expectNextMatches(t -> t.getT2().equals("tick 2"))
.expectNextMatches(t -> t.getT2().equals("tick 0"))
.expectNextMatches(t -> t.getT2().equals("tick 1"))
.expectNextMatches(t -> t.getT2().equals("tick 2"))
.verifyErrorMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void analyticsTest() throws Exception {
ReplayProcessor<Integer> source = ReplayProcessor.create();
long avgTime = 50l;
Mono<Long> result = source
.log("delay")
.publishOn(asyncGroup)
.delayElements(Duration.ofMillis(avgTime))
.elapsed()
.skip(1)
.groupBy(w -> w.getT1())
.flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
.log("elapsed")
.collectSortedList(Comparator.comparing(Tuple2::getT1))
.flatMapMany(Flux::fromIterable)
.reduce(-1L, (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) : next.getT1())
.log("reduced-elapsed")
.cache();
source.subscribe();
for (int j = 0; j < 10; j++) {
source.onNext(1);
}
source.onComplete();
Assert.assertTrue(result.block(Duration.ofSeconds(5)) >= avgTime * 0.6);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxHistoryTTL() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(2, Duration.ofMillis(2000))
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTL() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000)
, vts)
.cache(Duration.ofMillis(2000), vts)
.elapsed(vts);
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxHistoryTTL() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis
(1000), vts)
.cache(2, Duration.ofMillis(2000), vts)
.elapsed(vts);
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTL() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000))
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTLMillis() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000), vts)
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTLFused() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000))
.autoConnect()
.elapsed();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxHistoryTTLFused() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(2, Duration.ofMillis(2000))
.autoConnect()
.elapsed()
.log();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFlux() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000)
, vts)
.cache()
.elapsed(vts);
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFlux() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay()
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxFused() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay()
.autoConnect()
.elapsed();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!