reactor.core.publisher.Flux.doOnRequest()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(464)

本文整理了Java中reactor.core.publisher.Flux.doOnRequest()方法的一些代码示例,展示了Flux.doOnRequest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnRequest()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnRequest

Flux.doOnRequest介绍

[英]Add behavior (side-effect) triggering a LongConsumer when this Fluxreceives any request.

Note that non fatal error raised in the callback will not be propagated and will simply trigger Operators#onOperatorError(Throwable,Context).
[中]添加此Flux收到任何请求时触发LongConsumer的行为(副作用)。
请注意,回调中引发的非致命错误不会传播,只会触发运算符#onOperatorError(Throwable,Context)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
  return getHttpClient()
      .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
      .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
      .uri(url.toString())
      .handle((inbound, outbound) -> {
        HttpHeaders responseHeaders = toHttpHeaders(inbound);
        String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
        HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
        NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
        WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
        if (logger.isDebugEnabled()) {
          logger.debug("Started session '" + session.getId() + "' for " + url);
        }
        return handler.handle(session);
      })
      .doOnRequest(n -> {
        if (logger.isDebugEnabled()) {
          logger.debug("Connecting to " + url);
        }
      })
      .next();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferSizeIsAlsoPrefetch() {
  AtomicLong requested = new AtomicLong();
  Flux.range(1, 10)
    .hide()
    .doOnRequest(r -> requested.compareAndSet(0, r))
    .filterWhen(v -> Mono.just(v % 2 == 0), 5)
    .subscribe().dispose();
  assertThat(requested.get()).isEqualTo(5);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldNeverSendIncorrectRequestSizeToUpstream() throws InterruptedException {
  TestPublisher<Long> publisher = TestPublisher.createCold();
  AtomicLong capture = new AtomicLong();
  ArrayList<Long> requested = new ArrayList<>();
  CountDownLatch latch = new CountDownLatch(1);
  Flux<Long> switchTransformed = publisher.flux()
                      .doOnRequest(requested::add)
                      .switchOnFirst((first, innerFlux) -> innerFlux);
  publisher.next(1L);
  publisher.complete();
  switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, s -> s.request(1));
  latch.await(5, TimeUnit.SECONDS);
  Assertions.assertThat(capture.get()).isEqualTo(1L);
  Assertions.assertThat(requested).containsExactly(1L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldBeRequestedExactlyOneAndThenLongMaxValue() throws InterruptedException {
  TestPublisher<Long> publisher = TestPublisher.createCold();
  ArrayList<Long> capture = new ArrayList<>();
  ArrayList<Long> requested = new ArrayList<>();
  CountDownLatch latch = new CountDownLatch(1);
  Flux<Long> switchTransformed = publisher.flux()
                      .doOnRequest(requested::add)
                      .switchOnFirst((first, innerFlux) -> innerFlux);
  publisher.next(1L);
  publisher.complete();
  switchTransformed.subscribe(capture::add, __ -> {}, latch::countDown, s -> s.request(Long.MAX_VALUE));
  latch.await(5, TimeUnit.SECONDS);
  Assertions.assertThat(capture).containsExactly(1L);
  Assertions.assertThat(requested).containsExactly(1L, Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldBeRequestedExactlyOneAndThenLongMaxValueConditional() throws InterruptedException {
  TestPublisher<Long> publisher = TestPublisher.createCold();
  ArrayList<Long> capture = new ArrayList<>();
  ArrayList<Long> requested = new ArrayList<>();
  CountDownLatch latch = new CountDownLatch(1);
  Flux<Long> switchTransformed = publisher.flux()
                      .doOnRequest(requested::add)
                      .switchOnFirst((first, innerFlux) -> innerFlux);
  publisher.next(1L);
  publisher.complete();
  switchTransformed.subscribe(capture::add, __ -> {}, latch::countDown, s -> s.request(Long.MAX_VALUE));
  latch.await(5, TimeUnit.SECONDS);
  Assertions.assertThat(capture).containsExactly(1L);
  Assertions.assertThat(requested).containsExactly(1L, Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Override
public Flux<T> flux() {
  return Flux.from(delegate)
        .doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
        .doOnCancel(() -> incrementAndGet(CANCELLED))
        .doOnRequest(l -> incrementAndGet(REQUESTED));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prefetchMaxRequestsUnbounded() {
  AtomicLong initialRequest = new AtomicLong();
  StepVerifier.create(Flux.range(1, 10)
              .doOnRequest(r -> initialRequest.compareAndSet(0L, r))
              .groupBy(i -> i % 5, Integer.MAX_VALUE)
              .concatMap(v -> v))
        .expectNextCount(10)
        .verifyComplete();
  assertThat(initialRequest.get()).isEqualTo(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prefetchIsUsed() {
  AtomicLong initialRequest = new AtomicLong();
  StepVerifier.create(Flux.range(1, 10)
              .doOnRequest(r -> initialRequest.compareAndSet(0L, r))
              .groupBy(i -> i % 5, 11)
              .concatMap(v -> v))
        .expectNextCount(10)
        .verifyComplete();
  assertThat(initialRequest.get()).isEqualTo(11);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldNeverSendIncorrectRequestSizeToUpstreamConditional() throws InterruptedException {
  TestPublisher<Long> publisher = TestPublisher.createCold();
  AtomicLong capture = new AtomicLong();
  ArrayList<Long> requested = new ArrayList<>();
  CountDownLatch latch = new CountDownLatch(1);
  Flux<Long> switchTransformed = publisher.flux()
                      .doOnRequest(requested::add)
                      .switchOnFirst((first, innerFlux) -> innerFlux)
                      .filter(e -> true);
  publisher.next(1L);
  publisher.complete();
  switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, s -> s.request(1));
  latch.await(5, TimeUnit.SECONDS);
  Assertions.assertThat(capture.get()).isEqualTo(1L);
  Assertions.assertThat(requested).containsExactly(1L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void backpressureConditionalTest() {
  Flux<Integer> publisher = Flux.range(0, 10000);
  AtomicLong requested = new AtomicLong();
  Flux<String> switchTransformed = publisher.doOnRequest(requested::addAndGet)
                       .switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf))
                       .filter(e -> false);
  StepVerifier.create(switchTransformed, 0)
        .thenRequest(1)
        .expectComplete()
        .verify(Duration.ofSeconds(10));
  Assertions.assertThat(requested.get()).isEqualTo(2L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void backpressureDrawbackOnConditionalInTransformTest() {
  Flux<Integer> publisher = Flux.range(0, 10000);
  AtomicLong requested = new AtomicLong();
  Flux<String> switchTransformed = publisher.doOnRequest(requested::addAndGet)
                       .switchOnFirst((first, innerFlux) -> innerFlux
                           .map(String::valueOf)
                           .filter(e -> false));
  StepVerifier.create(switchTransformed, 0)
        .thenRequest(1)
        .expectComplete()
        .verify(Duration.ofSeconds(10));
  Assertions.assertThat(requested.get()).isEqualTo(10001L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void limitRateWithCloseLowTide() {
  List<Long> rebatchedRequest = Collections.synchronizedList(new ArrayList<>());
  final Flux<Integer> test = Flux
      .range(1, 14)
      .hide()
      .doOnRequest(rebatchedRequest::add)
      .limitRate(10,8);
  StepVerifier.create(test, 14)
        .expectNextCount(14)
        .verifyComplete();
  Assertions.assertThat(rebatchedRequest)
       .containsExactly(10L, 8L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void backpressureHiddenConditionalTest() {
  Flux<Integer> publisher = Flux.range(0, 10000);
  AtomicLong requested = new AtomicLong();
  Flux<String> switchTransformed = publisher.doOnRequest(requested::addAndGet)
                       .switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf)
                                              .hide())
                       .filter(e -> false);
  StepVerifier.create(switchTransformed, 0)
        .thenRequest(1)
        .expectComplete()
        .verify(Duration.ofSeconds(10));
  Assertions.assertThat(requested.get()).isEqualTo(10001L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void limitRateWithVeryLowTide() {
  List<Long> rebatchedRequest = Collections.synchronizedList(new ArrayList<>());
  final Flux<Integer> test = Flux
      .range(1, 14)
      .hide()
      .doOnRequest(rebatchedRequest::add)
      .limitRate(10,2);
  StepVerifier.create(test, 14)
        .expectNextCount(14)
        .verifyComplete();
  Assertions.assertThat(rebatchedRequest)
       .containsExactly(10L, 2L, 2L, 2L, 2L, 2L, 2L, 2L);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prefetchMaxTranslatesToUnboundedRequest() {
  AtomicLong requested = new AtomicLong();
  StepVerifier.create(Flux.just(1, 2, 3).hide()
              .doOnRequest(requested::set)
              .concatMap(i -> Flux.range(0, i), Integer.MAX_VALUE))
        .expectNext(0, 0, 1, 0, 1, 2)
        .verifyComplete();
  assertThat(requested.get())
      .isNotEqualTo(Integer.MAX_VALUE)
      .isEqualTo(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prefetchMaxTranslatesToUnboundedRequest2() {
  AtomicLong requested = new AtomicLong();
  StepVerifier.create(Flux.just(1, 2, 3).hide()
              .doOnRequest(requested::set)
              .concatMapDelayError(i -> Flux.range(0, i), Integer.MAX_VALUE))
        .expectNext(0, 0, 1, 0, 1, 2)
        .verifyComplete();
  assertThat(requested.get())
      .isNotEqualTo(Integer.MAX_VALUE)
      .isEqualTo(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mergePublisherPublisher(){
  AtomicLong request = new AtomicLong();
  StepVerifier.create(Flux.merge(Flux.just(Flux.just(1, 2), Flux.just(3, 4)).doOnRequest(request::set)))
        .expectNext(1, 2, 3, 4)
        .then(() -> assertThat(request.get()).isEqualTo(1) )
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void apiCall() {
  LongAdder rCount = new LongAdder();
  final Flux<Integer> source = Flux.range(1, 100)
                   .doOnRequest(rCount::add);
  StepVerifier.create(source.limitRequest(3))
        .expectNext(1, 2, 3)
        .verifyComplete();
  assertThat(rCount.longValue()).isEqualTo(3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void unboundedDownstreamRequest() {
  LongAdder rCount = new LongAdder();
  final Flux<Integer> source = Flux.range(1, 100)
                   .doOnRequest(rCount::add);
  Flux<Integer> test = new FluxLimitRequest<>(source, 3);
  StepVerifier.create(test, Long.MAX_VALUE)
        .expectNext(1, 2, 3)
        .verifyComplete();
  assertThat(rCount.longValue())
      .as("total request should match the limitRequest")
      .isEqualTo(3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void extraneousSmallRequestsNotPropagatedAsZero() {
  List<Long> requests = new ArrayList<>();
  final Flux<Integer> source = Flux.range(1, 100)
                   .doOnRequest(requests::add);
  Flux<Integer> test = new FluxLimitRequest<>(source, 11);
  StepVerifier.create(test, 0)
        .thenRequest(8)
        .thenRequest(2)
        .thenRequest(2)
        .thenRequest(2)
        .thenRequest(2)
        .expectNextCount(11)
        .verifyComplete();
  assertThat(requests)
      .as("limitRequest should not propagate extraneous requests as zeros")
      .containsExactly(8L, 2L, 1L);
}

相关文章

Flux类方法