本文整理了Java中reactor.core.publisher.Flux.onBackpressureLatest()
方法的一些代码示例,展示了Flux.onBackpressureLatest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onBackpressureLatest()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onBackpressureLatest
[英]Request an unbounded demand and push to the returned Flux, or only keep the most recent observed item if not enough demand is requested downstream.
[中]请求一个无限制的需求并推送到返回的流量,或者如果下游没有足够的需求,只保留最近观察到的项目。
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).onBackpressureLatest().subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void backpressuredComplex() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10000000)
.subscribeOn(Schedulers.parallel())
.onBackpressureLatest()
.publishOn(Schedulers.single())
.concatMap(Mono::just, 1)
.subscribe(ts);
for (int i = 0; i < 1000000; i++) {
ts.request(10);
}
ts.await();
ts
.assertNoError()
.assertComplete();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Flux#onBackpressureLatest()
*/
public final Flux<T> onBackpressureLatest() {
return boxed.onBackpressureLatest();
}
/**
代码示例来源:origin: io.projectreactor.kafka/reactor-kafka
private void start() {
log.debug("start");
if (!isActive.compareAndSet(false, true))
throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
fluxList.clear();
requestsPending.set(0);
consecutiveCommitFailures.set(0);
awaitingTransaction.set(false);
eventEmitter = EmitterProcessor.create();
eventSubmission = eventEmitter.sink(OverflowStrategy.BUFFER);
eventScheduler.start();
Flux<InitEvent> initFlux = Flux.just(initEvent);
fluxList.add(eventEmitter);
fluxList.add(initFlux);
Duration commitInterval = receiverOptions.commitInterval();
if ((ackMode == AckMode.AUTO_ACK || ackMode == AckMode.MANUAL_ACK) && !commitInterval.isZero()) {
Flux<CommitEvent> periodicCommitFlux = Flux.interval(receiverOptions.commitInterval())
.onBackpressureLatest()
.map(i -> commitEvent.periodicEvent());
fluxList.add(periodicCommitFlux);
}
eventFlux = Flux.merge(fluxList)
.publishOn(eventScheduler);
subscribeDisposables.add(eventFlux.subscribe(event -> doEvent(event)));
}
代码示例来源:origin: reactor/reactor-kafka
private void start() {
log.debug("start");
if (!isActive.compareAndSet(false, true))
throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
fluxList.clear();
requestsPending.set(0);
consecutiveCommitFailures.set(0);
awaitingTransaction.set(false);
eventEmitter = EmitterProcessor.create();
eventSubmission = eventEmitter.sink(OverflowStrategy.BUFFER);
eventScheduler.start();
Flux<InitEvent> initFlux = Flux.just(initEvent);
fluxList.add(eventEmitter);
fluxList.add(initFlux);
Duration commitInterval = receiverOptions.commitInterval();
if ((ackMode == AckMode.AUTO_ACK || ackMode == AckMode.MANUAL_ACK) && !commitInterval.isZero()) {
Flux<CommitEvent> periodicCommitFlux = Flux.interval(receiverOptions.commitInterval())
.onBackpressureLatest()
.map(i -> commitEvent.periodicEvent());
fluxList.add(periodicCommitFlux);
}
eventFlux = Flux.merge(fluxList)
.publishOn(eventScheduler);
subscribeDisposables.add(eventFlux.subscribe(event -> doEvent(event)));
}
代码示例来源:origin: org.apache.camel/camel-reactor
synchronized void attach(ReactiveStreamsProducer producer) {
Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
if (this.camelProducer != null) {
throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
}
if (this.camelProducer != producer) {
detach();
ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
Flux<Exchange> flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE);
if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
// signal item emitted for non-dropped items only
flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
} else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
// Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
// No exception is reported back to the exchanges
flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
} else {
// Default strategy is BUFFER
flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted);
}
flux.subscribe(this.publisher);
camelProducer = producer;
}
}
内容来源于网络,如有侵权,请联系作者删除!