reactor.core.publisher.Flux.blockLast()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.0k)|赞(0)|评价(0)|浏览(713)

本文整理了Java中reactor.core.publisher.Flux.blockLast()方法的一些代码示例,展示了Flux.blockLast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.blockLast()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:blockLast

Flux.blockLast介绍

[英]Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

Note that each blockLast() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
[中]订阅此流量并无限期阻塞,直到上游发出最后一个值或完成。返回该值,如果通量为空,则返回null。在通量错误的情况下,将抛出原始异常(如果它是已检查的异常,则包装在RuntimeException中)。
请注意,每个blockLast()都将触发一个新订阅:换句话说,结果可能会错过来自热门发布服务器的信号。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
  public void deferStream(){
    AtomicInteger i = new AtomicInteger();

    Flux<Integer> source =
        Flux.defer(() -> Flux.just(i.incrementAndGet()));

    Assert.assertEquals(source.blockLast().intValue(), 1);
    Assert.assertEquals(source.blockLast().intValue(), 2);
    Assert.assertEquals(source.blockLast().intValue(), 3);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncCancelBeforeComplete() {
  assertThat(Mono.just(Mono.just(1).publish(v -> v)).flatMapMany(v -> v).blockLast()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferWillSubdivideAnInputFluxOverlap() {
  Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
  //"non overlapping buffers"
  List<List<Integer>> res = numbers.buffer(3, 2)
                   .buffer()
                   .blockLast();
  assertThat(res).containsExactly(
      Arrays.asList(1, 2, 3),
      Arrays.asList(3, 4, 5),
      Arrays.asList(5, 6, 7),
      Arrays.asList(7, 8));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferWillSubdivideAnInputFlux() {
  Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
  //"non overlapping buffers"
  List<List<Integer>> res = numbers.buffer(2, 3)
                   .buffer()
                   .blockLast();
  assertThat(res).containsExactly(Arrays.asList(1, 2),
      Arrays.asList(4, 5),
      Arrays.asList(7, 8));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalCancelBeforeComplete() {
  assertThat(Mono.just(Mono.just(1).hide().publish(v -> v)).flatMapMany(v -> v).blockLast()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void windowWillSubdivideAnInputFluxOverlap() {
  Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
  //"non overlapping windows"
  List<List<Integer>> res = numbers.window(3, 2)
                   .concatMap(Flux::buffer)
                   .buffer()
                   .blockLast();
  assertThat(res).containsExactly(
      Arrays.asList(1, 2, 3),
      Arrays.asList(3, 4, 5),
      Arrays.asList(5, 6, 7),
      Arrays.asList(7, 8));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockLastDoesntCancel() {
  AtomicLong cancelCount = new AtomicLong();
  Flux.range(1, 10)
    .doOnCancel(cancelCount::incrementAndGet)
    .blockLast();
  assertThat(cancelCount.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Override
public DefaultStepVerifierBuilder<T> expectNoAccessibleContext() {
  return consumeSubscriptionWith(sub -> {
        Scannable lowest = Scannable.from(sub);
        Scannable verifierSubscriber = Scannable.from(lowest.scan(Scannable.Attr.ACTUAL));
        Context c = Flux.fromStream(verifierSubscriber.parents())
                .ofType(CoreSubscriber.class)
                .map(CoreSubscriber::currentContext)
                .blockLast();
        if (c != null) {
          throw errorFormatter.assertionError("Expected no accessible Context, got " + c);
        }
      });
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingLastError2() {
  Flux.defer(() -> Mono.error(new RuntimeException("test")))
    .subscribeOn(scheduler)
    .blockLast(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void severalInARowExecutedInReverseOrder() {
  Queue<String> finallyOrder = new ConcurrentLinkedDeque<>();
  Flux.just("b")
    .hide()
    .doFinally(s -> finallyOrder.offer("FIRST"))
    .doFinally(s -> finallyOrder.offer("SECOND"))
    .blockLast();
  Assertions.assertThat(finallyOrder)
       .containsExactly("SECOND", "FIRST");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void severalInARowExecutedInReverseOrder() {
  Queue<String> finallyOrder = new ConcurrentLinkedDeque<>();
  Flux.just("b")
    .hide()
    .doFinally(s -> finallyOrder.offer("FIRST"))
    .doFinally(s -> finallyOrder.offer("SECOND"))
    .blockLast();
  Assertions.assertThat(finallyOrder)
       .containsExactly("SECOND", "FIRST");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingLastTimeout() {
  assertThat(Flux.empty()
          .blockLast(Duration.ofMillis(1))).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingLast2() {
  Assert.assertEquals((Integer) 10,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockLast(Duration.ofSeconds(10)));
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingLastError() {
  Flux.defer(() -> Mono.error(new RuntimeException("test")))
    .subscribeOn(scheduler)
    .blockLast();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncCancelBeforeComplete() {
  assertThat(Flux.just(Flux.just(1).publish(v -> v)).flatMap(v -> v).blockLast()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fromPublishersSequentialSubscribe() {
  List<Integer> values = Collections.synchronizedList(new ArrayList<>(10));
  ParallelFlux.from(Flux.range(1, 3), Flux.range(4, 3))
        .runOn(Schedulers.parallel())
        .doOnNext(values::add)
        .sequential()
        .blockLast();
  assertThat(values)
       .hasSize(6)
       .containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalCancelBeforeComplete() {
  assertThat(Flux.just(Flux.just(1).hide().publish(v -> v)).flatMap(v -> v).blockLast()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingLast() {
  Assert.assertEquals((Integer) 10,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockLast());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void FluxMetricsFusedNoOp() {
  assertThatCode(() -> Flux.just("foo").metrics().blockLast())
      .doesNotThrowAnyException();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void FluxMetricsNoOp() {
  assertThatCode(() -> Flux.just("foo").hide().metrics().blockLast())
      .doesNotThrowAnyException();
}

相关文章

Flux类方法