reactor.core.publisher.Flux.onBackpressureLatest()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(4.6k)|赞(0)|评价(0)|浏览(213)

本文整理了Java中reactor.core.publisher.Flux.onBackpressureLatest()方法的一些代码示例,展示了Flux.onBackpressureLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onBackpressureLatest()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onBackpressureLatest

Flux.onBackpressureLatest介绍

[英]Request an unbounded demand and push to the returned Flux, or only keep the most recent observed item if not enough demand is requested downstream.
[中]请求一个无限制的需求并推送到返回的流量,或者如果下游没有足够的需求,只保留最近观察到的项目。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).onBackpressureLatest().subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void backpressuredComplex() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10000000)
    .subscribeOn(Schedulers.parallel())
    .onBackpressureLatest()
    .publishOn(Schedulers.single())
    .concatMap(Mono::just, 1)
    .subscribe(ts);
  for (int i = 0; i < 1000000; i++) {
    ts.request(10);
  }
  ts.await();
  ts
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#onBackpressureLatest()
 */
public final Flux<T> onBackpressureLatest() {
  return boxed.onBackpressureLatest();
}
/**

代码示例来源:origin: io.projectreactor.kafka/reactor-kafka

private void start() {
  log.debug("start");
  if (!isActive.compareAndSet(false, true))
    throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
  fluxList.clear();
  requestsPending.set(0);
  consecutiveCommitFailures.set(0);
  awaitingTransaction.set(false);
  eventEmitter = EmitterProcessor.create();
  eventSubmission = eventEmitter.sink(OverflowStrategy.BUFFER);
  eventScheduler.start();
  Flux<InitEvent> initFlux = Flux.just(initEvent);
  fluxList.add(eventEmitter);
  fluxList.add(initFlux);
  Duration commitInterval = receiverOptions.commitInterval();
  if ((ackMode == AckMode.AUTO_ACK || ackMode == AckMode.MANUAL_ACK) && !commitInterval.isZero()) {
    Flux<CommitEvent> periodicCommitFlux = Flux.interval(receiverOptions.commitInterval())
                          .onBackpressureLatest()
                          .map(i -> commitEvent.periodicEvent());
    fluxList.add(periodicCommitFlux);
  }
  eventFlux = Flux.merge(fluxList)
          .publishOn(eventScheduler);
  subscribeDisposables.add(eventFlux.subscribe(event -> doEvent(event)));
}

代码示例来源:origin: reactor/reactor-kafka

private void start() {
  log.debug("start");
  if (!isActive.compareAndSet(false, true))
    throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
  fluxList.clear();
  requestsPending.set(0);
  consecutiveCommitFailures.set(0);
  awaitingTransaction.set(false);
  eventEmitter = EmitterProcessor.create();
  eventSubmission = eventEmitter.sink(OverflowStrategy.BUFFER);
  eventScheduler.start();
  Flux<InitEvent> initFlux = Flux.just(initEvent);
  fluxList.add(eventEmitter);
  fluxList.add(initFlux);
  Duration commitInterval = receiverOptions.commitInterval();
  if ((ackMode == AckMode.AUTO_ACK || ackMode == AckMode.MANUAL_ACK) && !commitInterval.isZero()) {
    Flux<CommitEvent> periodicCommitFlux = Flux.interval(receiverOptions.commitInterval())
                          .onBackpressureLatest()
                          .map(i -> commitEvent.periodicEvent());
    fluxList.add(periodicCommitFlux);
  }
  eventFlux = Flux.merge(fluxList)
          .publishOn(eventScheduler);
  subscribeDisposables.add(eventFlux.subscribe(event -> doEvent(event)));
}

代码示例来源:origin: org.apache.camel/camel-reactor

synchronized void attach(ReactiveStreamsProducer producer) {
  Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
  if (this.camelProducer != null) {
    throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
  }
  if (this.camelProducer != producer) {
    detach();
    ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
    Flux<Exchange> flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE);
    if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
      // signal item emitted for non-dropped items only
      flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
    } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
      // Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
      // No exception is reported back to the exchanges
      flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
    } else {
      // Default strategy is BUFFER
      flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted);
    }
    flux.subscribe(this.publisher);
    camelProducer = producer;
  }
}

相关文章

Flux类方法