本文整理了Java中reactor.core.publisher.Flux.onBackpressureDrop()
方法的一些代码示例,展示了Flux.onBackpressureDrop()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onBackpressureDrop()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onBackpressureDrop
[英]Request an unbounded demand and push to the returned Flux, or drop the observed elements if not enough demand is requested downstream.
[中]请求一个无限制的需求并推到返回的通量,或者如果下游没有足够的需求,则丢弃观察到的元素。
代码示例来源:origin: reactor/reactor-core
/**
* Request an unbounded demand and push to the returned {@link Flux}, or emit onError
* fom {@link Exceptions#failWithOverflow} if not enough demand is requested
* downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/onBackpressureError.svg" alt="">
*
* @reactor.discard This operator discards elements that it drops, after having propagated
* the error.
*
* @return a backpressured {@link Flux} that errors on overflowing elements
*/
public final Flux<T> onBackpressureError() {
return onBackpressureDrop(t -> { throw Exceptions.failWithOverflow();});
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void onDropNull() {
Flux.never().onBackpressureDrop(null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onDropThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.onBackpressureDrop(e -> {
throw new RuntimeException("forced failure");
})
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void onBackpressureDrop() {
StepVerifier.create(Flux.range(1, 100)
.onBackpressureDrop(), 0)
.thenRequest(5)
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Ignore("delayElements test for local comparison run")
@Test
public void delayElements() {
Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
.onBackpressureDrop()
.delayElements(Duration.ofMillis(500))
.take(33)
.elapsed()
.log();
StepVerifier.create(test)
.thenConsumeWhile(t2 -> t2.getT1() >= 500)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.onBackpressureDrop()
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.onBackpressureDrop()
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
@Ignore
public void test() throws Exception {
processor = WorkQueueProcessor.builder().name("test-processor").bufferSize(RINGBUFFER_SIZE).build();
Flux
.create((emitter) -> burstyProducer(emitter, PRODUCED_MESSAGES_COUNT, BURST_SIZE))
.onBackpressureDrop(this::incrementDroppedMessagesCounter)
// .log("test", Level.INFO, SignalType.REQUEST)
.subscribeWith(processor)
.map(this::complicatedCalculation)
.subscribe(this::logConsumedValue);
waitForProducerFinish();
System.out.println("\n\nMax ringbuffer pending: " + maxRingBufferPending.get());
assertEquals(getDroppedMessagesCount(), 0, "Expect zero dropped messages");
}
代码示例来源:origin: rsocket/rsocket-java
private static Flux<Long> input() {
Flux<Long> interval = Flux.interval(Duration.ofMillis(1)).onBackpressureDrop();
for (int i = 0; i < 10; i++) {
interval = interval.mergeWith(interval);
}
return interval;
}
代码示例来源:origin: rsocket/rsocket-java
@Test
public void testFluxOnly() {
Flux<Long> longFlux = Flux.interval(Duration.ofMillis(1)).onBackpressureDrop();
Flux.range(1, 60).flatMap(i -> longFlux.take(1000)).blockLast();
}
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param onDropped
* @return
* @see reactor.core.publisher.Flux#onBackpressureDrop(java.util.function.Consumer)
*/
public final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped) {
return boxed.onBackpressureDrop(onDropped);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Flux#onBackpressureDrop()
*/
public final Flux<T> onBackpressureDrop() {
return boxed.onBackpressureDrop();
}
/**
代码示例来源:origin: rsocket/rsocket-java
client
.requestChannel(
input().onBackpressureDrop().map(iv -> DefaultPayload.create("foo")))
.limitRate(10000),
concurrency)
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Request an unbounded demand and push to the returned {@link Flux}, or emit onError
* fom {@link Exceptions#failWithOverflow} if not enough demand is requested
* downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/onBackpressureError.svg" alt="">
*
* @reactor.discard This operator discards elements that it drops, after having propagated
* the error.
*
* @return a backpressured {@link Flux} that errors on overflowing elements
*/
public final Flux<T> onBackpressureError() {
return onBackpressureDrop(t -> { throw Exceptions.failWithOverflow();});
}
代码示例来源:origin: bclozel/webflux-workshop
public Flux<Quote> fetchQuoteStream(Duration period) {
// We want to emit quotes with a specific period;
// to do so, we create a Flux.interval
return Flux.interval(period)
// In case of back-pressure, drop events
.onBackpressureDrop()
// For each tick, generate a list of quotes
.map(this::generateQuotes)
// "flatten" that List<Quote> into a Flux<Quote>
.flatMapIterable(quotes -> quotes)
.log("io.spring.workshop.stockquotes");
}
代码示例来源:origin: io.rsocket.rpc/rsocket-rpc-core
@Override
public void run() {
synchronized (this) {
if (disposable != null) {
return;
}
}
this.disposable =
Flux.interval(exportFrequency)
.onBackpressureDrop()
.concatMap(
l ->
handler
.streamMetrics(getMetricsSnapshotStream(), Unpooled.EMPTY_BUFFER)
.doOnNext(this::recordClockSkew))
.doOnError(throwable -> logger.debug("error streaming metrics", throwable))
.retry()
.subscribe();
}
}
代码示例来源:origin: netifi-proteus/proteus-java
@Override
public Flux<SimpleResponse> serverStreamingRpc(SimpleRequest message, ByteBuf metadata) {
String requestMessage = message.getRequestMessage();
return Flux.interval(Duration.ofMillis(1))
.publish()
.refCount()
.onBackpressureDrop()
.map(i -> i + " - got message - " + requestMessage)
.map(s -> SimpleResponse.newBuilder().setResponseMessage(s).build());
}
代码示例来源:origin: org.apache.camel/camel-reactor
synchronized void attach(ReactiveStreamsProducer producer) {
Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
if (this.camelProducer != null) {
throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
}
if (this.camelProducer != producer) {
detach();
ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
Flux<Exchange> flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE);
if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
// signal item emitted for non-dropped items only
flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
} else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
// Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
// No exception is reported back to the exchanges
flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
} else {
// Default strategy is BUFFER
flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted);
}
flux.subscribe(this.publisher);
camelProducer = producer;
}
}
代码示例来源:origin: netifi-proteus/proteus-java
@Override
public Flux<Skew> streamMetrics(Publisher<MetricsSnapshot> messages, ByteBuf metadata) {
DirectProcessor<Skew> processor = DirectProcessor.create();
Disposable subscribe =
Flux.from(messages)
.limitRate(256, 32)
.flatMapIterable(MetricsSnapshot::getMetersList)
.flatMap(
meter ->
Flux.fromIterable(meter.getMeasureList())
.doOnNext(meterMeasurement -> record(meter, meterMeasurement)))
.doOnComplete(processor::onComplete)
.doOnError(processor::onError)
.subscribe();
Flux.interval(Duration.ofSeconds(metricsSkewInterval))
.map(l -> Skew.newBuilder().setTimestamp(System.currentTimeMillis()).build())
.onBackpressureDrop()
.doFinally(s -> subscribe.dispose())
.subscribe(processor);
return processor;
}
内容来源于网络,如有侵权,请联系作者删除!