本文整理了Java中reactor.core.publisher.Flux.windowTimeout()
方法的一些代码示例,展示了Flux.windowTimeout()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.windowTimeout()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:windowTimeout
[英]Split this Flux sequence into multiple Flux windows containing maxSize elements (or less for the final window) and starting from the first item. Each Flux window will onComplete once it contains maxSize elements OR it has been open for the given Duration (as measured on the Schedulers#parallel()Scheduler).
[中]将此通量序列拆分为多个通量窗口,其中包含maxSize元素(对于最后一个窗口,小于此值),并从第一项开始。一旦每个通量窗口包含maxSize元素或在给定的持续时间内打开(在Schedulers#parallel()Scheduler上测量),它就会完成。
代码示例来源:origin: reactor/reactor-core
/**
* Split this {@link Flux} sequence into multiple {@link Flux} windows containing
* {@code maxSize} elements (or less for the final window) and starting from the first item.
* Each {@link Flux} window will onComplete once it contains {@code maxSize} elements
* OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel}
* Scheduler).
*
* <p>
* <img class="marble" src="doc-files/marbles/windowTimeout.svg" alt="">
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
* @param maxSize the maximum number of items to emit in the window before closing it
* @param maxTime the maximum {@link Duration} since the window was opened before closing it
*
* @return a {@link Flux} of {@link Flux} windows based on element count and duration
*/
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime) {
return windowTimeout(maxSize, maxTime , Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_windowWithTimeoutAccumulateOnTimeOrSize() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.windowTimeout(5, Duration.ofMillis(2000))
.concatMap(Flux::buffer);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowTimeoutComplete() throws Exception {
Flux<Flux<Integer>> windows = source.windowTimeout(5, Duration.ofMillis(200));
subscribe(windows);
generate(0, 3);
Thread.sleep(300);
generateAndComplete(3, 3);
verifyMainComplete(Arrays.asList(0, 1, 2), Arrays.asList(3, 4, 5));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowTimeoutInnerCancel() throws Exception {
Flux<Flux<Integer>> windows = source.windowTimeout(5, Duration.ofMillis(5000));
subscribe(windows);
generateWithCancel(0, 6, 1);
verifyInnerCancel(0, i -> i != 2, Arrays.asList(0, 1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noDelayMultipleOfSize() {
StepVerifier.create(Flux.range(1, 10)
.windowTimeout(5, Duration.ofSeconds(1))
.concatMap(Flux::collectList)
)
.assertNext(l -> assertThat(l).containsExactly(1, 2, 3, 4, 5))
.assertNext(l -> assertThat(l).containsExactly(6, 7, 8, 9, 10))
.assertNext(l -> assertThat(l).isEmpty())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowTimeoutMainCancelNoNewWindow() throws Exception {
Flux<Flux<Integer>> windows = source.windowTimeout(5, Duration.ofMillis(200));
subscribe(windows);
generate(0, 1);
Thread.sleep(300);
generate(1, 1);
mainSubscriber.cancel();
Thread.sleep(300);
generate(2, 1);
verifyMainCancelNoNewWindow(1, Arrays.asList(0), Arrays.asList(1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noDelayGreaterThanSize() {
StepVerifier.create(Flux.range(1, 12)
.windowTimeout(5, Duration.ofHours(1))
.concatMap(Flux::collectList)
)
.assertNext(l -> assertThat(l).containsExactly(1, 2, 3, 4, 5))
.assertNext(l -> assertThat(l).containsExactly(6, 7, 8, 9, 10))
.assertNext(l -> assertThat(l).containsExactly(11, 12))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void longEmptyEmitsEmptyWindowsRegularly() {
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofMillis(350))
.ignoreElement()
.as(Flux::from)
.windowTimeout(1000, Duration.ofMillis(100))
.concatMap(Flux::collectList)
)
.thenAwait(Duration.ofMinutes(1))
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void longDelaysStartEndEmitEmptyWindows() {
StepVerifier.withVirtualTime(() ->
Mono.just("foo")
.delayElement(Duration.ofMillis(400 + 400 + 300))
.concatWith(Mono.delay(Duration.ofMillis(100 + 400 + 100)).then(Mono.empty()))
.windowTimeout(1000, Duration.ofMillis(400))
.concatMap(Flux::collectList)
)
.thenAwait(Duration.ofHours(1))
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).containsExactly("foo"))
.assertNext(l -> assertThat(l).isEmpty())
.assertNext(l -> assertThat(l).isEmpty()) //closing window
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowTimeoutMainCancel() throws Exception {
Flux<Flux<Integer>> windows = source.windowTimeout(10, Duration.ofMillis(100));
subscribe(windows);
generate(0, 2);
mainSubscriber.cancel();
generate(2, 3);
Thread.sleep(200);
generate(5, 10);
verifyMainCancel(true);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowWithTimeoutStartsTimerOnSubscription() {
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(300))
.thenMany(Flux.range(1, 3))
.delayElements(Duration.ofMillis(150))
.concatWith(Flux.range(4, 10).delaySubscription(Duration.ofMillis(500)))
.windowTimeout(10, Duration.ofMillis(500))
.flatMap(Flux::collectList)
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(100))
.assertNext(l -> assertThat(l).containsExactly(1))
.assertNext(l -> assertThat(l).containsExactly(2, 3))
.assertNext(l -> assertThat(l).containsExactly(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
.assertNext(l -> assertThat(l).isEmpty())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testIssue912() {
StepVerifier.withVirtualTime(() -> Flux.concat(
Flux.just("#").delayElements(Duration.ofMillis(20)),
Flux.range(1, 10),
Flux.range(11, 5).delayElements(Duration.ofMillis(15))
)
.windowTimeout(10, Duration.ofMillis(1)).concatMap(w -> w).log())
.thenAwait(Duration.ofMillis(95))
.expectNextCount(16)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
.windowTimeout(2, Duration.ofSeconds(2), testScheduler)
.concatMap(w -> {
reject.set(true);
代码示例来源:origin: reactor/reactor-core
@Test
public void rejectedOnSubscription() {
Scheduler testScheduler = new Scheduler() {
@Override
public Disposable schedule(Runnable task) {
throw Exceptions.failWithRejected();
}
@Override
public Worker createWorker() {
return new Worker() {
@Override
public Disposable schedule(Runnable task) {
throw Exceptions.failWithRejected();
}
@Override
public void dispose() {
}
};
}
};
StepVerifier.create(Flux.range(1, 3).hide()
.windowTimeout(10, Duration.ofMillis(500), testScheduler))
.expectNextCount(1)
.verifyError(RejectedExecutionException.class);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Split this {@link Flux} sequence into multiple {@link Flux} windows containing
* {@code maxSize} elements (or less for the final window) and starting from the first item.
* Each {@link Flux} window will onComplete once it contains {@code maxSize} elements
* OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel}
* Scheduler).
*
* <p>
* <img class="marble" src="doc-files/marbles/windowTimeout.svg" alt="">
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
* @param maxSize the maximum number of items to emit in the window before closing it
* @param maxTime the maximum {@link Duration} since the window was opened before closing it
*
* @return a {@link Flux} of {@link Flux} windows based on element count and duration
*/
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime) {
return windowTimeout(maxSize, maxTime , Schedulers.parallel());
}
代码示例来源:origin: io.netifi.proteus/tracing-openzipkin
.windowTimeout(128, Duration.ofMillis(1000))
.map(
strings ->
代码示例来源:origin: io.netifi.proteus/proteus-tracing-openzipkin
.windowTimeout(128, Duration.ofMillis(1000))
.map(
strings ->
代码示例来源:origin: netifi-proteus/proteus-java
.windowTimeout(128, Duration.ofMillis(1000))
.map(
strings ->
代码示例来源:origin: netifi-proteus/proteus-java
Publisher<SimpleRequest> messages, ByteBuf metadata) {
return Flux.from(messages)
.windowTimeout(10, Duration.ofSeconds(500))
.take(1)
.flatMap(Function.identity())
代码示例来源:origin: akarnokd/akarnokd-misc
Flux.range(11, 5).delayElements(Duration.ofMillis(15))
.windowTimeout(10, Duration.ofMillis(1))
.subscribe(flx -> flx.subscribe(System.out::println));
内容来源于网络,如有侵权,请联系作者删除!