本文整理了Java中reactor.core.publisher.Flux.handle()
方法的一些代码示例,展示了Flux.handle()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.handle()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:handle
[英]Handle the items emitted by this Flux by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink#next(Object)call must be performed and/or 0 or 1 SynchronousSink#error(Throwable) or SynchronousSink#complete().
[中]通过使用每个onNext的输出接收器调用biconsumer来处理此流量发出的项。最多必须执行一个SynchronousSink#next(Object)调用和/或0或1个SynchronousSink#error(Throwable)或SynchronousSink#complete()。
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterErrorFused() {
StepVerifier.create(Flux.just(1)
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.next(2);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeAfterErrorFused() {
StepVerifier.create(Flux.just(1)
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.complete();
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot complete after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterErrorFusedConditional() {
StepVerifier.create(Flux.just(1)
.filter(i -> true)
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.next(2);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeAfterErrorFusedConditional() {
StepVerifier.create(Flux.just(1)
.filter(i -> true)
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.complete();
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot complete after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeAfterErrorNormal() {
StepVerifier.create(Flux.just(1)
.hide()
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.complete();
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot complete after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterNextFused() {
StepVerifier.create(Flux.just(1)
.handle((v, sink) -> {
sink.next(v);
sink.next(v + 1);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit more than one data"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterErrorNormal() {
StepVerifier.create(Flux.just(1)
.hide()
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.next(2);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorAfterCompleteNormal() {
StepVerifier.create(Flux.just(1)
.hide()
.handle((v, sink) -> {
sink.complete();
sink.error(new NullPointerException("boom"));
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot error after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorAfterCompleteFusedConditional() {
StepVerifier.create(Flux.just(1)
.filter(i -> true)
.handle((v, sink) -> {
sink.complete();
sink.error(new NullPointerException("boom"));
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot error after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeAfterErrorNormalConditional() {
StepVerifier.create(Flux.just(1)
.hide()
.filter(i -> true)
.handle((v, sink) -> {
sink.error(new NullPointerException("boom"));
sink.complete();
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot complete after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterCompleteFusedConditional() {
StepVerifier.create(Flux.just(1)
.filter(i -> true)
.handle((v, sink) -> {
sink.complete();
sink.next(2);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterCompleteNormal() {
StepVerifier.create(Flux.just(1)
.hide()
.handle((v, sink) -> {
sink.complete();
sink.next(2);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterNextNormal() {
StepVerifier.create(Flux.just(1)
.hide()
.handle((v, sink) -> {
sink.next(v);
sink.next(v + 1);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit more than one data"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterNextFusedConditional() {
StepVerifier.create(Flux.just(1)
.filter(i -> true)
.handle((v, sink) -> {
sink.next(v);
sink.next(v + 1);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit more than one data"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterCompleteNormalConditional() {
StepVerifier.create(Flux.just(1)
.hide()
.filter(i -> true)
.handle((v, sink) -> {
sink.complete();
sink.next(2);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit after a complete or error"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nextAfterNextNormalConditional() {
StepVerifier.create(Flux.just(1)
.hide()
.filter(i -> true)
.handle((v, sink) -> {
sink.next(v);
sink.next(v + 1);
}))
.verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot emit more than one data"));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalHide() {
Set<Integer> expectedValues = new HashSet<>(Arrays.asList(2, 4, 6, 8, 10));
Flux.range(1, 5)
.hide()
.handle((v, s) -> s.next(v * 2))
.subscribeWith(AssertSubscriber.create())
.assertContainValues(expectedValues)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextTest() {
StepVerifier.create(Flux.just("foo")
.handle((d, s) -> s.next(s.currentContext()
.get(AtomicInteger.class)
.incrementAndGet()))
.repeat(9)
.subscriberContext(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void dropHandleFusedSync() {
StepVerifier.create(Flux.just("test", "test2")
.handle((data, s) -> {
})
.filter(t -> true))
.expectFusion(Fuseable.SYNC)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalSyncFusion() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Set<Integer> expectedValues = new HashSet<>(Arrays.asList(2, 4, 6, 8, 10));
ts.requestedFusionMode(SYNC);
Flux.range(1, 5).<Integer>handle((v, s) -> s.next(v * 2)).subscribe(ts);
ts.assertContainValues(expectedValues)
.assertNoError()
.assertComplete()
.assertFuseableSource()
.assertFusionMode(SYNC);
}
内容来源于网络,如有侵权,请联系作者删除!