本文整理了Java中reactor.core.publisher.Flux.doOnRequest()
方法的一些代码示例,展示了Flux.doOnRequest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnRequest()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnRequest
[英]Add behavior (side-effect) triggering a LongConsumer when this Fluxreceives any request.
Note that non fatal error raised in the callback will not be propagated and will simply trigger Operators#onOperatorError(Throwable,Context).
[中]添加此Flux收到任何请求时触发LongConsumer的行为(副作用)。
请注意,回调中引发的非致命错误不会传播,只会触发运算符#onOperatorError(Throwable,Context)。
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session);
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void bufferSizeIsAlsoPrefetch() {
AtomicLong requested = new AtomicLong();
Flux.range(1, 10)
.hide()
.doOnRequest(r -> requested.compareAndSet(0, r))
.filterWhen(v -> Mono.just(v % 2 == 0), 5)
.subscribe().dispose();
assertThat(requested.get()).isEqualTo(5);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldNeverSendIncorrectRequestSizeToUpstream() throws InterruptedException {
TestPublisher<Long> publisher = TestPublisher.createCold();
AtomicLong capture = new AtomicLong();
ArrayList<Long> requested = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Long> switchTransformed = publisher.flux()
.doOnRequest(requested::add)
.switchOnFirst((first, innerFlux) -> innerFlux);
publisher.next(1L);
publisher.complete();
switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, s -> s.request(1));
latch.await(5, TimeUnit.SECONDS);
Assertions.assertThat(capture.get()).isEqualTo(1L);
Assertions.assertThat(requested).containsExactly(1L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldBeRequestedExactlyOneAndThenLongMaxValue() throws InterruptedException {
TestPublisher<Long> publisher = TestPublisher.createCold();
ArrayList<Long> capture = new ArrayList<>();
ArrayList<Long> requested = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Long> switchTransformed = publisher.flux()
.doOnRequest(requested::add)
.switchOnFirst((first, innerFlux) -> innerFlux);
publisher.next(1L);
publisher.complete();
switchTransformed.subscribe(capture::add, __ -> {}, latch::countDown, s -> s.request(Long.MAX_VALUE));
latch.await(5, TimeUnit.SECONDS);
Assertions.assertThat(capture).containsExactly(1L);
Assertions.assertThat(requested).containsExactly(1L, Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldBeRequestedExactlyOneAndThenLongMaxValueConditional() throws InterruptedException {
TestPublisher<Long> publisher = TestPublisher.createCold();
ArrayList<Long> capture = new ArrayList<>();
ArrayList<Long> requested = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Long> switchTransformed = publisher.flux()
.doOnRequest(requested::add)
.switchOnFirst((first, innerFlux) -> innerFlux);
publisher.next(1L);
publisher.complete();
switchTransformed.subscribe(capture::add, __ -> {}, latch::countDown, s -> s.request(Long.MAX_VALUE));
latch.await(5, TimeUnit.SECONDS);
Assertions.assertThat(capture).containsExactly(1L);
Assertions.assertThat(requested).containsExactly(1L, Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Override
public Flux<T> flux() {
return Flux.from(delegate)
.doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
.doOnCancel(() -> incrementAndGet(CANCELLED))
.doOnRequest(l -> incrementAndGet(REQUESTED));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchMaxRequestsUnbounded() {
AtomicLong initialRequest = new AtomicLong();
StepVerifier.create(Flux.range(1, 10)
.doOnRequest(r -> initialRequest.compareAndSet(0L, r))
.groupBy(i -> i % 5, Integer.MAX_VALUE)
.concatMap(v -> v))
.expectNextCount(10)
.verifyComplete();
assertThat(initialRequest.get()).isEqualTo(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchIsUsed() {
AtomicLong initialRequest = new AtomicLong();
StepVerifier.create(Flux.range(1, 10)
.doOnRequest(r -> initialRequest.compareAndSet(0L, r))
.groupBy(i -> i % 5, 11)
.concatMap(v -> v))
.expectNextCount(10)
.verifyComplete();
assertThat(initialRequest.get()).isEqualTo(11);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldNeverSendIncorrectRequestSizeToUpstreamConditional() throws InterruptedException {
TestPublisher<Long> publisher = TestPublisher.createCold();
AtomicLong capture = new AtomicLong();
ArrayList<Long> requested = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Long> switchTransformed = publisher.flux()
.doOnRequest(requested::add)
.switchOnFirst((first, innerFlux) -> innerFlux)
.filter(e -> true);
publisher.next(1L);
publisher.complete();
switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, s -> s.request(1));
latch.await(5, TimeUnit.SECONDS);
Assertions.assertThat(capture.get()).isEqualTo(1L);
Assertions.assertThat(requested).containsExactly(1L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void backpressureConditionalTest() {
Flux<Integer> publisher = Flux.range(0, 10000);
AtomicLong requested = new AtomicLong();
Flux<String> switchTransformed = publisher.doOnRequest(requested::addAndGet)
.switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf))
.filter(e -> false);
StepVerifier.create(switchTransformed, 0)
.thenRequest(1)
.expectComplete()
.verify(Duration.ofSeconds(10));
Assertions.assertThat(requested.get()).isEqualTo(2L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void backpressureDrawbackOnConditionalInTransformTest() {
Flux<Integer> publisher = Flux.range(0, 10000);
AtomicLong requested = new AtomicLong();
Flux<String> switchTransformed = publisher.doOnRequest(requested::addAndGet)
.switchOnFirst((first, innerFlux) -> innerFlux
.map(String::valueOf)
.filter(e -> false));
StepVerifier.create(switchTransformed, 0)
.thenRequest(1)
.expectComplete()
.verify(Duration.ofSeconds(10));
Assertions.assertThat(requested.get()).isEqualTo(10001L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void limitRateWithCloseLowTide() {
List<Long> rebatchedRequest = Collections.synchronizedList(new ArrayList<>());
final Flux<Integer> test = Flux
.range(1, 14)
.hide()
.doOnRequest(rebatchedRequest::add)
.limitRate(10,8);
StepVerifier.create(test, 14)
.expectNextCount(14)
.verifyComplete();
Assertions.assertThat(rebatchedRequest)
.containsExactly(10L, 8L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void backpressureHiddenConditionalTest() {
Flux<Integer> publisher = Flux.range(0, 10000);
AtomicLong requested = new AtomicLong();
Flux<String> switchTransformed = publisher.doOnRequest(requested::addAndGet)
.switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf)
.hide())
.filter(e -> false);
StepVerifier.create(switchTransformed, 0)
.thenRequest(1)
.expectComplete()
.verify(Duration.ofSeconds(10));
Assertions.assertThat(requested.get()).isEqualTo(10001L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void limitRateWithVeryLowTide() {
List<Long> rebatchedRequest = Collections.synchronizedList(new ArrayList<>());
final Flux<Integer> test = Flux
.range(1, 14)
.hide()
.doOnRequest(rebatchedRequest::add)
.limitRate(10,2);
StepVerifier.create(test, 14)
.expectNextCount(14)
.verifyComplete();
Assertions.assertThat(rebatchedRequest)
.containsExactly(10L, 2L, 2L, 2L, 2L, 2L, 2L, 2L);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchMaxTranslatesToUnboundedRequest() {
AtomicLong requested = new AtomicLong();
StepVerifier.create(Flux.just(1, 2, 3).hide()
.doOnRequest(requested::set)
.concatMap(i -> Flux.range(0, i), Integer.MAX_VALUE))
.expectNext(0, 0, 1, 0, 1, 2)
.verifyComplete();
assertThat(requested.get())
.isNotEqualTo(Integer.MAX_VALUE)
.isEqualTo(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchMaxTranslatesToUnboundedRequest2() {
AtomicLong requested = new AtomicLong();
StepVerifier.create(Flux.just(1, 2, 3).hide()
.doOnRequest(requested::set)
.concatMapDelayError(i -> Flux.range(0, i), Integer.MAX_VALUE))
.expectNext(0, 0, 1, 0, 1, 2)
.verifyComplete();
assertThat(requested.get())
.isNotEqualTo(Integer.MAX_VALUE)
.isEqualTo(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mergePublisherPublisher(){
AtomicLong request = new AtomicLong();
StepVerifier.create(Flux.merge(Flux.just(Flux.just(1, 2), Flux.just(3, 4)).doOnRequest(request::set)))
.expectNext(1, 2, 3, 4)
.then(() -> assertThat(request.get()).isEqualTo(1) )
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void apiCall() {
LongAdder rCount = new LongAdder();
final Flux<Integer> source = Flux.range(1, 100)
.doOnRequest(rCount::add);
StepVerifier.create(source.limitRequest(3))
.expectNext(1, 2, 3)
.verifyComplete();
assertThat(rCount.longValue()).isEqualTo(3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void unboundedDownstreamRequest() {
LongAdder rCount = new LongAdder();
final Flux<Integer> source = Flux.range(1, 100)
.doOnRequest(rCount::add);
Flux<Integer> test = new FluxLimitRequest<>(source, 3);
StepVerifier.create(test, Long.MAX_VALUE)
.expectNext(1, 2, 3)
.verifyComplete();
assertThat(rCount.longValue())
.as("total request should match the limitRequest")
.isEqualTo(3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void extraneousSmallRequestsNotPropagatedAsZero() {
List<Long> requests = new ArrayList<>();
final Flux<Integer> source = Flux.range(1, 100)
.doOnRequest(requests::add);
Flux<Integer> test = new FluxLimitRequest<>(source, 11);
StepVerifier.create(test, 0)
.thenRequest(8)
.thenRequest(2)
.thenRequest(2)
.thenRequest(2)
.thenRequest(2)
.expectNextCount(11)
.verifyComplete();
assertThat(requests)
.as("limitRequest should not propagate extraneous requests as zeros")
.containsExactly(8L, 2L, 1L);
}
内容来源于网络,如有侵权,请联系作者删除!