reactor.core.publisher.Flux.handle()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中reactor.core.publisher.Flux.handle()方法的一些代码示例,展示了Flux.handle()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.handle()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:handle

Flux.handle介绍

[英]Handle the items emitted by this Flux by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink#next(Object)call must be performed and/or 0 or 1 SynchronousSink#error(Throwable) or SynchronousSink#complete().
[中]通过使用每个onNext的输出接收器调用biconsumer来处理此流量发出的项。最多必须执行一个SynchronousSink#next(Object)调用和/或0或1个SynchronousSink#error(Throwable)或SynchronousSink#complete()。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterErrorFused() {
  StepVerifier.create(Flux.just(1)
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.next(2);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeAfterErrorFused() {
  StepVerifier.create(Flux.just(1)
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.complete();
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot complete after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterErrorFusedConditional() {
  StepVerifier.create(Flux.just(1)
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.next(2);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeAfterErrorFusedConditional() {
  StepVerifier.create(Flux.just(1)
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.complete();
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot complete after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeAfterErrorNormal() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.complete();
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot complete after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterNextFused() {
  StepVerifier.create(Flux.just(1)
              .handle((v, sink) -> {
                sink.next(v);
                sink.next(v + 1);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit more than one data"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterErrorNormal() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.next(2);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorAfterCompleteNormal() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .handle((v, sink) -> {
                sink.complete();
                sink.error(new NullPointerException("boom"));
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot error after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorAfterCompleteFusedConditional() {
  StepVerifier.create(Flux.just(1)
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.complete();
                sink.error(new NullPointerException("boom"));
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot error after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeAfterErrorNormalConditional() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.error(new NullPointerException("boom"));
                sink.complete();
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot complete after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterCompleteFusedConditional() {
  StepVerifier.create(Flux.just(1)
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.complete();
                sink.next(2);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
        .hasMessage("Cannot emit after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterCompleteNormal() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .handle((v, sink) -> {
                sink.complete();
                sink.next(2);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
        .hasMessage("Cannot emit after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterNextNormal() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .handle((v, sink) -> {
                sink.next(v);
                sink.next(v + 1);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit more than one data"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterNextFusedConditional() {
  StepVerifier.create(Flux.just(1)
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.next(v);
                sink.next(v + 1);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit more than one data"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterCompleteNormalConditional() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.complete();
                sink.next(2);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
        .hasMessage("Cannot emit after a complete or error"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nextAfterNextNormalConditional() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .filter(i -> true)
              .handle((v, sink) -> {
                sink.next(v);
                sink.next(v + 1);
              }))
        .verifyErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
                            .hasMessage("Cannot emit more than one data"));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalHide() {
  Set<Integer> expectedValues = new HashSet<>(Arrays.asList(2, 4, 6, 8, 10));
  Flux.range(1, 5)
    .hide()
    .handle((v, s) -> s.next(v * 2))
    .subscribeWith(AssertSubscriber.create())
    .assertContainValues(expectedValues)
    .assertNoError()
    .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTest() {
  StepVerifier.create(Flux.just("foo")
              .handle((d, s) -> s.next(s.currentContext()
                          .get(AtomicInteger.class)
                          .incrementAndGet()))
              .repeat(9)
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void dropHandleFusedSync() {
  StepVerifier.create(Flux.just("test", "test2")
              .handle((data, s) -> {
              })
              .filter(t -> true))
        .expectFusion(Fuseable.SYNC)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalSyncFusion() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Set<Integer> expectedValues = new HashSet<>(Arrays.asList(2, 4, 6, 8, 10));
  ts.requestedFusionMode(SYNC);
  Flux.range(1, 5).<Integer>handle((v, s) -> s.next(v * 2)).subscribe(ts);
  ts.assertContainValues(expectedValues)
   .assertNoError()
   .assertComplete()
   .assertFuseableSource()
   .assertFusionMode(SYNC);
}

相关文章

Flux类方法