本文整理了Java中reactor.core.publisher.Flux.windowUntil()
方法的一些代码示例,展示了Flux.windowUntil()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.windowUntil()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:windowUntil
[英]Split this Flux sequence into multiple Flux windows delimited by the given predicate. A new window is opened each time the predicate returns true, at which point the previous window will receive the triggering element then onComplete.
Windows are lazily made available downstream at the point where they receive their first event (an element is pushed, the window errors). This variant shouldn't expose empty windows, as the separators are emitted into the windows they close.
[中]将此通量序列拆分为由给定谓词分隔的多个通量窗口。每次谓词返回true时都会打开一个新窗口,此时前一个窗口将接收触发元素,然后onComplete。
窗口在其接收到第一个事件(一个元素被推送,窗口错误)的点上延迟地使下游可用。此变体不应暴露空窗口,因为分隔符会发射到它们关闭的窗口中。
代码示例来源:origin: reactor/reactor-core
/**
* Split this {@link Flux} sequence into multiple {@link Flux} windows delimited by the
* given predicate. A new window is opened each time the predicate returns true, at which
* point the previous window will receive the triggering element then onComplete.
* <p>
* Windows are lazily made available downstream at the point where they receive their
* first event (an element is pushed, the window errors). This variant shouldn't
* expose empty windows, as the separators are emitted into
* the windows they close.
*
* <p>
* <img class="marble" src="doc-files/marbles/windowUntil.svg" alt="">
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
* @param boundaryTrigger a predicate that triggers the next window when it becomes true.
* @return a {@link Flux} of {@link Flux} windows, bounded depending
* on the predicate.
*/
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger) {
return windowUntil(boundaryTrigger, false);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, Flux<String>>> scenarios_errorFromUpstreamFailure() {
return Arrays.asList(
scenario(f -> f.windowWhile(t -> true)),
scenario(f -> f.windowUntil(t -> true)),
scenario(f -> f.windowUntil(t -> true, true))
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilMainCancelNoNewWindow() throws Exception {
Flux<Flux<Integer>> windows = source.windowUntil(i -> i % 3 == 0);
subscribe(windows);
generateWithCancel(0, 4, 1);
verifyMainCancelNoNewWindow(2, Arrays.asList(0), Arrays.asList(1, 2, 3));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilComplete() throws Exception {
Flux<Flux<Integer>> windows = source.windowUntil(i -> i % 3 == 0);
subscribe(windows);
generateAndComplete(1, 5);
verifyMainComplete(Arrays.asList(1, 2, 3), Arrays.asList(4, 5));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilCutBeforeIntentionallyEmptyWindows() {
Flux.just("ALPHA", "#", "BETA", "#", "#")
.windowUntil("#"::equals, true)
.flatMap(Flux::collectList)
.as(StepVerifier::create)
.assertNext(w -> assertThat(w).containsExactly("ALPHA"))
.assertNext(w -> assertThat(w).containsExactly("#", "BETA"))
.assertNext(w -> assertThat(w).containsExactly("#"))
.assertNext(w -> assertThat(w).containsExactly("#"))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilMainCancel() throws Exception {
Flux<Flux<Integer>> windows = source.windowUntil(i -> i % 3 == 0);
subscribe(windows);
generateWithCancel(1, 4, 10);
verifyMainCancel(true, Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilInnerCancel() throws Exception {
Flux<Flux<Integer>> windows = source.windowUntil(i -> i % 3 == 0);
subscribe(windows);
generateWithCancel(0, 6, 1);
verifyInnerCancel(1, i -> i != 3, Arrays.asList(0), Arrays.asList(1, 2));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilIntentionallyEmptyWindows() {
Flux.just("ALPHA", "#", "BETA", "#", "#")
.windowUntil("#"::equals)
.flatMap(Flux::collectList)
.as(StepVerifier::create)
.assertNext(w -> assertThat(w).containsExactly("ALPHA", "#"))
.assertNext(w -> assertThat(w).containsExactly("BETA", "#"))
.assertNext(w -> assertThat(w).containsExactly("#"))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilCutBeforeNoEmptyWindows() {
Flux.just("ALPHA", "#", "BETA", "#")
.windowUntil("#"::equals, true)
.flatMap(Flux::collectList)
.as(StepVerifier::create)
.assertNext(w -> assertThat(w).containsExactly("ALPHA"))
.assertNext(w -> assertThat(w).containsExactly("#", "BETA"))
.assertNext(w -> assertThat(w).containsExactly("#"))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilNoEmptyWindows() {
Flux.just("ALPHA", "#", "BETA", "#")
.windowUntil("#"::equals)
.flatMap(Flux::collectList)
.as(StepVerifier::create)
.assertNext(w -> assertThat(w).containsExactly("ALPHA", "#"))
.assertNext(w -> assertThat(w).containsExactly("BETA", "#"))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, Flux<String>>> scenarios_operatorError() {
return Arrays.asList(
scenario(f -> f.windowWhile(t -> {
throw exception();
})),
scenario(f -> f.windowUntil(t -> {
throw exception();
})),
scenario(f -> f.windowUntil(t -> {
throw exception();
}, true))
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowUntilUnboundedStartingDelimiterReplenishes() {
AtomicLong req = new AtomicLong();
Flux<String> source = Flux.just("#", "1A", "1B", "1C", "#", "2A", "2B", "2C", "2D", "#", "3A").hide();
StepVerifier.create(
source
.doOnRequest(req::addAndGet)
.log("source", Level.FINE)
.windowUntil(s -> "#".equals(s), false, 2)
.log("windowUntil", Level.FINE)
.concatMap(w -> w.collectList()
.log("window", Level.FINE)
, 1)
.log("downstream", Level.FINE)
)
.assertNext(l -> assertThat(l).containsExactly("#"))
.assertNext(l -> assertThat(l).containsExactly("1A", "1B", "1C", "#"))
.assertNext(l -> assertThat(l).containsExactly("2A", "2B", "2C", "2D", "#"))
.assertNext(l -> assertThat(l).containsExactly("3A"))
.expectComplete()
.verify(Duration.ofSeconds(1));
//TODO is there something wrong here? concatMap now falls back to no fusion because of THREAD_BARRIER, and this results in 15 request total, not 13
assertThat(req.get()).isGreaterThanOrEqualTo(13); //11 elements + the prefetch
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mismatchAtBeginningUntil() {
StepVerifier.create(Flux.just("#", "red", "green")
.windowUntil(s -> s.equals("#"))
.flatMap(Flux::materialize)
.map(sig -> sig.isOnComplete() ? "END" : sig.get()))
.expectNext("#", "END")
.expectNext("red", "green", "END")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mismatchAtBeginningUntilCutBefore() {
StepVerifier.create(Flux.just("#", "red", "green")
.windowUntil(s -> s.equals("#"), true)
.flatMap(Flux::materialize)
.map(sig -> sig.isOnComplete() ? "END" : sig.get()))
.expectNext("END")
.expectNext("#", "red", "green", "END")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchIntegerMaxIsRequestUnboundedUntil() {
TestPublisher<?> tp = TestPublisher.create();
tp.flux().windowUntil(s -> true, true, Integer.MAX_VALUE).subscribe();
tp.assertMinRequested(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiUntil() {
StepVerifier.create(Flux.just("red", "green", "#", "orange", "blue", "#", "black", "white")
.windowUntil(color -> color.equals("#"))
.flatMap(Flux::materialize)
.map(s -> s.isOnComplete() ? "WINDOW CLOSED" : s.get()))
.expectNext("red", "green", "#", "WINDOW CLOSED")
.expectNext("orange", "blue", "#", "WINDOW CLOSED")
.expectNext("black", "white", "WINDOW CLOSED")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiUntilCutAfter() {
StepVerifier.create(Flux.just("red", "green", "#", "orange", "blue", "#", "black", "white")
.windowUntil(color -> color.equals("#"), false)
.flatMap(Flux::materialize)
.map(s -> s.isOnComplete() ? "WINDOW CLOSED" : s.get()))
.expectNext("red", "green", "#", "WINDOW CLOSED")
.expectNext("orange", "blue", "#", "WINDOW CLOSED")
.expectNext("black", "white", "WINDOW CLOSED")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiUntilCutBefore() {
StepVerifier.create(Flux.just("red", "green", "#", "orange", "blue", "#", "black", "white")
.windowUntil(color -> color.equals("#"), true)
.flatMap(Flux::materialize)
.map(s -> s.isOnComplete() ? "WINDOW CLOSED" : s.get()))
.expectNext("red", "green", "WINDOW CLOSED", "#")
.expectNext("orange", "blue", "WINDOW CLOSED", "#")
.expectNext("black", "white", "WINDOW CLOSED")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, Flux<String>>> scenarios_operatorSuccess() {
return Arrays.asList(
scenario(f -> f.windowUntil(t -> true, true, 1))
.prefetch(1)
.receive(s -> s.buffer().subscribe(b -> Assert.fail()),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(0))),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(1))),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(2)))),
scenario(f -> f.windowWhile(t -> true))
.receive(s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(0), item(1), item(2)))),
scenario(f -> f.windowUntil(t -> true))
.receive(s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(0))),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(1))),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(2)))),
scenario(f -> f.windowUntil(t -> true, false, 1))
.prefetch(1)
.receive(s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(0))),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(1))),
s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(2)))),
scenario(f -> f.windowUntil(t -> false))
.receive(s -> s.buffer().subscribe(b -> assertThat(b).containsExactly(item(0), item(1), item(2))))
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void manualRequestWindowUntilOverRequestingSourceByPrefetch() {
AtomicLong req = new AtomicLong();
int prefetch = 4;
Flux<Integer> source = Flux.range(1, 20)
.doOnRequest(req::addAndGet)
.log("source", Level.FINE)
.hide();
StepVerifier.create(source.windowUntil(i -> i % 5 == 0, false, prefetch)
.concatMap(w -> w, 1)
.log("downstream", Level.FINE), 0)
.thenRequest(2)
.expectNext(1, 2)
.thenRequest(6)
.expectNext(3, 4, 5, 6, 7, 8)
.expectNoEvent(Duration.ofMillis(100))
.thenCancel()
.verify();
assertThat(req.get()).isEqualTo(8 + prefetch);
}
内容来源于网络,如有侵权,请联系作者删除!