本文整理了Java中reactor.core.publisher.Flux.filterWhen()
方法的一些代码示例,展示了Flux.filterWhen()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.filterWhen()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:filterWhen
[英]Test each value emitted by this Flux asynchronously using a generated Publisher test. A value is replayed if the first item emitted by its corresponding test is true. It is dropped if its test is either empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it is a Mono, test will be cancelled after receiving that first value. Test publishers are generated and subscribed to in sequence.
[中]使用生成的发布者测试异步测试此流量发出的每个值。如果相应测试发出的第一项为真,则会重放该值。如果其测试为空或其第一个发出的值为false,则会丢弃它。
请注意,只考虑测试发布者的第一个值,除非它是Mono,否则在收到第一个值后,测试将被取消。按顺序生成并订阅测试发布服务器。
代码示例来源:origin: reactor/reactor-core
@Test
public void bufferSizeIsAlsoPrefetch() {
AtomicLong requested = new AtomicLong();
Flux.range(1, 10)
.hide()
.doOnRequest(r -> requested.compareAndSet(0, r))
.filterWhen(v -> Mono.just(v % 2 == 0), 5)
.subscribe().dispose();
assertThat(requested.get()).isEqualTo(5);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerMonoNotCancelled() {
AtomicInteger cancelCount = new AtomicInteger();
StepVerifier.create(Flux.range(1, 3)
.filterWhen(v -> Mono.just(true)
.doOnCancel(cancelCount::incrementAndGet)))
.expectNext(1, 2, 3)
.verifyComplete();
assertThat(cancelCount.get()).isEqualTo(0);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
StepVerifier.create(Flux.<Integer>error(new IllegalStateException())
.filterWhen(v -> Mono.just(true)))
.verifyError(IllegalStateException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void empty() {
StepVerifier.create(Flux.<Integer>empty()
.filterWhen(v -> Mono.just(true)))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allEmptyFused() {
StepVerifier.create(Flux.range(1, 10)
.filterWhen(v -> Mono.empty()))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyBackpressured() {
StepVerifier.create(Flux.<Integer>empty()
.filterWhen(v -> Mono.just(true)), 0L)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void predicateThrows() {
StepVerifier.create(Flux.just(1)
.filterWhen(v -> { throw new IllegalStateException(); }))
.verifyError(IllegalStateException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
StepVerifier.withVirtualTime(() -> Flux.range(1, 10)
.filterWhen(v -> Mono.just(v % 2 == 0)
.delayElement(Duration.ofMillis(100))))
.thenAwait(Duration.ofSeconds(5))
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void filterAllOut() {
final int[] calls = { 0 };
StepVerifier.create(
Flux.range(1, 1000)
.doOnNext(v -> calls[0]++)
.filterWhen(v -> Mono.just(false), 16)
.flatMap(ignore -> Flux.just(0)))
.verifyComplete();
assertThat(calls[0]).isEqualTo(1000);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void backpressureExactlyOne() {
StepVerifier.create(Flux.just(1)
.filterWhen(v -> Mono.just(true)), 1L)
.expectNext(1)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void predicateErrorFused() {
StepVerifier.create(Flux.just(1)
.filterWhen(v -> Mono.fromCallable(() -> { throw new IllegalStateException(); })))
.verifyError(IllegalStateException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalSyncFused() {
StepVerifier.create(Flux.range(1, 10)
.filterWhen(v -> Mono.just(v % 2 == 0)))
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void allEmpty() {
StepVerifier.create(Flux.range(1, 10)
.filterWhen(v -> Mono.<Boolean>empty().hide()))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void longSourceFused() {
StepVerifier.create(Flux.range(1, 1000)
.filterWhen(v -> Mono.just(true)))
.expectNextCount(1000)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void filterAllOutHidden() {
final int[] calls = { 0 };
StepVerifier.create(
Flux.range(1, 1000)
.doOnNext(v -> calls[0]++)
.filterWhen(v -> Mono.just(false).hide(), 16)
.flatMap(ignore -> Flux.just(0)))
.verifyComplete();
assertThat(calls[0]).isEqualTo(1000);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void predicateNull() {
StepVerifier.create(Flux.just(1).filterWhen(v -> null))
.verifyError(NullPointerException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalSync() {
StepVerifier.create(Flux.range(1, 10)
.filterWhen(v -> Mono.just(v % 2 == 0).hide()))
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerFluxOnlyConsidersFirstValue() {
StepVerifier.create(Flux.range(1, 3)
.filterWhen(v -> Flux.just(false, true, true)))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void take() {
StepVerifier.create(Flux.range(1, 10)
.filterWhen(v -> Mono.just(v % 2 == 0).hide())
.take(1))
.expectNext(2)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void longSourceSingleStep() {
StepVerifier.create(Flux.range(1, 1000)
.filterWhen(v -> Flux.just(true).limitRate(1)))
.expectNextCount(1000)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!