reactor.core.publisher.Flux.blockFirst()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(569)

本文整理了Java中reactor.core.publisher.Flux.blockFirst()方法的一些代码示例,展示了Flux.blockFirst()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.blockFirst()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:blockFirst

Flux.blockFirst介绍

[英]Subscribe to this Flux and block indefinitely until the upstream signals its first value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

Note that each blockFirst() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
[中]订阅此流量并无限期阻塞,直到上游发出其第一个值或完成。返回该值,如果通量为空,则返回null。在通量错误的情况下,将抛出原始异常(如果它是已检查的异常,则包装在RuntimeException中)。
请注意,每个blockFirst()都将触发一个新订阅:换句话说,结果可能会错过来自热门发布服务器的信号。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingFirstError2() {
  Flux.error(new RuntimeException("test"))
    .publishOn(scheduler)
    .blockFirst(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockFirstCancelsOnce() {
  AtomicLong cancelCount = new AtomicLong();
  Flux.range(1, 10)
    .doOnCancel(cancelCount::incrementAndGet)
    .blockFirst();
  assertThat(cancelCount.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingFirstError() {
  Flux.error(new RuntimeException("test"))
    .publishOn(scheduler)
    .blockFirst();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingFirst2() {
  Assert.assertEquals((Integer) 1,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockFirst(Duration.ofSeconds(10)));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingFirstTimeout() {
  assertThat(Flux.empty()
          .blockFirst(Duration.ofMillis(1))).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingFirst() {
  Assert.assertEquals((Integer) 1,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockFirst());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMainSubscriber() {
  CoreSubscriber<Flux<Integer>> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  FluxWindowPredicate.WindowPredicateMain<Integer> test = new FluxWindowPredicate.WindowPredicateMain<>(actual,
      Queues.<Flux<Integer>>unbounded().get(), Queues.unbounded(), 123, i -> true, Mode.WHILE);
  Subscription parent = Operators.emptySubscription();
  test.onSubscribe(parent);
  Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  Assertions.assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
  Assertions.assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(123);
  test.requested = 35;
  Assertions.assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(35);
  test.queue.offer(Flux.just(1).groupBy(i -> i).blockFirst());
  Assertions.assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  Assertions.assertThat(test.scan(Scannable.Attr.ERROR)).isNull();
  test.error = new IllegalStateException("boom");
  Assertions.assertThat(test.scan(Scannable.Attr.ERROR)).hasMessage("boom");
  Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
  test.onComplete();
  Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
  Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
  test.cancel();
  Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void adaptRequest() throws Exception {
  TestHttpHandler handler = new TestHttpHandler(response -> {
    response.setStatusCode(HttpStatus.OK);
    return response.setComplete();
  });
  new HttpHandlerConnector(handler).connect(HttpMethod.POST, URI.create("/custom-path"),
      request -> {
        request.getHeaders().put("custom-header", Arrays.asList("h0", "h1"));
        request.getCookies().add("custom-cookie", new HttpCookie("custom-cookie", "c0"));
        return request.writeWith(Mono.just(toDataBuffer("Custom body")));
      }).block(Duration.ofSeconds(5));
  MockServerHttpRequest request = (MockServerHttpRequest) handler.getSavedRequest();
  assertEquals(HttpMethod.POST, request.getMethod());
  assertEquals("/custom-path", request.getURI().toString());
  HttpHeaders headers = request.getHeaders();
  assertEquals(Arrays.asList("h0", "h1"), headers.get("custom-header"));
  assertEquals(new HttpCookie("custom-cookie", "c0"), request.getCookies().getFirst("custom-cookie"));
  assertEquals(Collections.singletonList("custom-cookie=c0"), headers.get(HttpHeaders.COOKIE));
  DataBuffer buffer = request.getBody().blockFirst(Duration.ZERO);
  assertEquals("Custom body", DataBufferTestUtils.dumpString(buffer, UTF_8));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void adaptResponse() throws Exception {
  ResponseCookie cookie = ResponseCookie.from("custom-cookie", "c0").build();
  TestHttpHandler handler = new TestHttpHandler(response -> {
    response.setStatusCode(HttpStatus.OK);
    response.getHeaders().put("custom-header", Arrays.asList("h0", "h1"));
    response.addCookie(cookie);
    return response.writeWith(Mono.just(toDataBuffer("Custom body")));
  });
  ClientHttpResponse response = new HttpHandlerConnector(handler)
      .connect(HttpMethod.GET, URI.create("/custom-path"), ReactiveHttpOutputMessage::setComplete)
      .block(Duration.ofSeconds(5));
  assertEquals(HttpStatus.OK, response.getStatusCode());
  HttpHeaders headers = response.getHeaders();
  assertEquals(Arrays.asList("h0", "h1"), headers.get("custom-header"));
  assertEquals(cookie, response.getCookies().getFirst("custom-cookie"));
  assertEquals(Collections.singletonList("custom-cookie=c0"), headers.get(HttpHeaders.SET_COOKIE));
  DataBuffer buffer = response.getBody().blockFirst(Duration.ZERO);
  assertEquals("Custom body", DataBufferTestUtils.dumpString(buffer, UTF_8));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxBlockFirstForbidden() {
  Function<String, String> badMapper = v -> Flux.just(v).hide()
                         .blockFirst();
  Function<String, String> badMapperTimeout = v -> Flux.just(v).hide()
                             .blockFirst(Duration.ofMillis(100));
  Mono<String> forbiddenSequence1 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapper);
  StepVerifier.create(forbiddenSequence1)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
  Mono<String> forbiddenSequence2 = Mono.just("data")
                     .publishOn(nonBlockingScheduler)
                     .map(badMapperTimeout);
  StepVerifier.create(forbiddenSequence2)
        .expectErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-"))
        .verify();
}

代码示例来源:origin: micronaut-projects/micronaut-core

@Test
public void testClientAnnotationStreaming() throws Exception {
  try( EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class) ) {
    HeadlineClient headlineClient = embeddedServer
        .getApplicationContext()
        .getBean(HeadlineClient.class);
    Event<Headline> headline = headlineClient.streamHeadlines().blockFirst();
    assertNotNull( headline );
    assertTrue( headline.getData().getText().startsWith("Latest Headline") );
  }
}
// end::streamingClient[]

代码示例来源:origin: spring-projects/spring-restdocs

private ByteArrayOutputStream readPartBodyContent(Part part) {
  ByteArrayOutputStream contentStream = new ByteArrayOutputStream();
  DataBufferUtils.write(part.content(), contentStream).blockFirst();
  return contentStream;
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#blockFirst()
 */
public final T blockFirst() {
  return boxed.blockFirst();
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param d
 * @return
 * @see reactor.core.publisher.Flux#blockFirst(java.time.Duration)
 */
public final T blockFirst(Duration d) {
  return boxed.blockFirst(d);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

@Override
public T firstValue() {
  return flux.blockFirst();
}

代码示例来源:origin: rsocket/rsocket-java

@Test
public void testChannelRequestCancellation() {
 MonoProcessor<Void> cancelled = MonoProcessor.create();
 Flux<Payload> request = Flux.<Payload>never().doOnCancel(cancelled::onComplete);
 rule.socket.requestChannel(request).subscribe().dispose();
 Flux.first(
     cancelled,
     Flux.error(new IllegalStateException("Channel request not cancelled"))
       .delaySubscription(Duration.ofSeconds(1)))
   .blockFirst();
}

代码示例来源:origin: rsocket/rsocket-java

@Test(timeout = 5_000L)
public void testZeroPayload() {
 handler =
   new AbstractRSocket() {
    @Override
    public Flux<Payload> requestStream(Payload payload) {
     return Flux.just(EmptyPayload.INSTANCE);
    }
   };
 RSocket client = buildClient();
 Payload result = client.requestStream(DefaultPayload.create("REQUEST", "META")).blockFirst();
 assertEquals("", result.getDataUtf8());
}

代码示例来源:origin: mokies/ratelimitj

public RedisScriptLoader(RedisScriptingReactiveCommands<String, String> redisScriptingCommands, String scriptUri, boolean eagerLoad) {
  requireNonNull(redisScriptingCommands);
  this.redisScriptingCommands = redisScriptingCommands;
  this.scriptUri = requireNonNull(scriptUri);
  this.storedScript = new AtomicReference<>(loadScript());
  if (eagerLoad) {
    this.storedScript.get().doOnComplete(() -> LOG.info("Redis Script eager load complete")).blockFirst(Duration.ofSeconds(10));
  }
}

代码示例来源:origin: mokies/ratelimitj

@Test
@DisplayName("should load rate limit lua script into Redis")
void shouldLoadScript() {
  RedisScriptLoader scriptLoader = new RedisScriptLoader(extension.getScriptingReactiveCommands(), "hello-world.lua");
  scriptFlush();
  String sha = scriptLoader.storedScript().block(Duration.ofSeconds(5)).getSha();
  assertThat(sha).isNotEmpty();
  assertThat(extension.getScriptingReactiveCommands().scriptExists(sha).blockFirst()).isTrue();
}

代码示例来源:origin: mokies/ratelimitj

@Test
@DisplayName("should eagerly load rate limit lua script into Redis")
void shouldEagerlyLoadScript() {
  RedisScriptLoader scriptLoader = new RedisScriptLoader(extension.getScriptingReactiveCommands(), "hello-world.lua", true);
  String sha = scriptLoader.storedScript().block(Duration.ofSeconds(5)).getSha();
  assertThat(sha).isNotEmpty();
  scriptFlush();
  new RedisScriptLoader(extension.getScriptingReactiveCommands(), "hello-world.lua", true);
  assertThat(extension.getScriptingReactiveCommands().scriptExists(sha).blockFirst()).isTrue();
}

相关文章

Flux类方法