reactor.core.publisher.Flux.onBackpressureDrop()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(195)

本文整理了Java中reactor.core.publisher.Flux.onBackpressureDrop()方法的一些代码示例,展示了Flux.onBackpressureDrop()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onBackpressureDrop()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onBackpressureDrop

Flux.onBackpressureDrop介绍

[英]Request an unbounded demand and push to the returned Flux, or drop the observed elements if not enough demand is requested downstream.
[中]请求一个无限制的需求并推到返回的通量,或者如果下游没有足够的需求,则丢弃观察到的元素。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Request an unbounded demand and push to the returned {@link Flux}, or emit onError
 * fom {@link Exceptions#failWithOverflow} if not enough demand is requested
 * downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/onBackpressureError.svg" alt="">
 *
 * @reactor.discard This operator discards elements that it drops, after having propagated
 * the error.
 *
 * @return a backpressured {@link Flux} that errors on overflowing elements
 */
public final Flux<T> onBackpressureError() {
  return onBackpressureDrop(t -> { throw Exceptions.failWithOverflow();});
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void onDropNull() {
  Flux.never().onBackpressureDrop(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onDropThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)
    .onBackpressureDrop(e -> {
      throw new RuntimeException("forced failure");
    })
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onBackpressureDrop() {
  StepVerifier.create(Flux.range(1, 100)
              .onBackpressureDrop(), 0)
        .thenRequest(5)
        .expectNext(1, 2, 3, 4, 5)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Ignore("delayElements test for local comparison run")
@Test
public void delayElements() {
  Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
                    .onBackpressureDrop()
                    .delayElements(Duration.ofMillis(500))
                    .take(33)
                    .elapsed()
                    .log();
  StepVerifier.create(test)
        .thenConsumeWhile(t2 -> t2.getT1() >= 500)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10)
    .onBackpressureDrop()
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10)
    .onBackpressureDrop()
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
@Ignore
public void test() throws Exception {
  processor = WorkQueueProcessor.builder().name("test-processor").bufferSize(RINGBUFFER_SIZE).build();
  Flux
      .create((emitter) -> burstyProducer(emitter, PRODUCED_MESSAGES_COUNT, BURST_SIZE))
      .onBackpressureDrop(this::incrementDroppedMessagesCounter)
    //	.log("test", Level.INFO, SignalType.REQUEST)
      .subscribeWith(processor)
      .map(this::complicatedCalculation)
      .subscribe(this::logConsumedValue);
  waitForProducerFinish();
  System.out.println("\n\nMax ringbuffer pending: " + maxRingBufferPending.get());
  assertEquals(getDroppedMessagesCount(), 0, "Expect zero dropped messages");
}

代码示例来源:origin: rsocket/rsocket-java

private static Flux<Long> input() {
 Flux<Long> interval = Flux.interval(Duration.ofMillis(1)).onBackpressureDrop();
 for (int i = 0; i < 10; i++) {
  interval = interval.mergeWith(interval);
 }
 return interval;
}

代码示例来源:origin: rsocket/rsocket-java

@Test
 public void testFluxOnly() {
  Flux<Long> longFlux = Flux.interval(Duration.ofMillis(1)).onBackpressureDrop();

  Flux.range(1, 60).flatMap(i -> longFlux.take(1000)).blockLast();
 }
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param onDropped
 * @return
 * @see reactor.core.publisher.Flux#onBackpressureDrop(java.util.function.Consumer)
 */
public final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped) {
  return boxed.onBackpressureDrop(onDropped);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#onBackpressureDrop()
 */
public final Flux<T> onBackpressureDrop() {
  return boxed.onBackpressureDrop();
}
/**

代码示例来源:origin: rsocket/rsocket-java

client
    .requestChannel(
      input().onBackpressureDrop().map(iv -> DefaultPayload.create("foo")))
    .limitRate(10000),
concurrency)

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Request an unbounded demand and push to the returned {@link Flux}, or emit onError
 * fom {@link Exceptions#failWithOverflow} if not enough demand is requested
 * downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/onBackpressureError.svg" alt="">
 *
 * @reactor.discard This operator discards elements that it drops, after having propagated
 * the error.
 *
 * @return a backpressured {@link Flux} that errors on overflowing elements
 */
public final Flux<T> onBackpressureError() {
  return onBackpressureDrop(t -> { throw Exceptions.failWithOverflow();});
}

代码示例来源:origin: bclozel/webflux-workshop

public Flux<Quote> fetchQuoteStream(Duration period) {
  // We want to emit quotes with a specific period;
  // to do so, we create a Flux.interval
  return Flux.interval(period)
      // In case of back-pressure, drop events
      .onBackpressureDrop()
      // For each tick, generate a list of quotes
      .map(this::generateQuotes)
      // "flatten" that List<Quote> into a Flux<Quote>
      .flatMapIterable(quotes -> quotes)
      .log("io.spring.workshop.stockquotes");
}

代码示例来源:origin: io.rsocket.rpc/rsocket-rpc-core

@Override
 public void run() {
  synchronized (this) {
   if (disposable != null) {
    return;
   }
  }

  this.disposable =
    Flux.interval(exportFrequency)
      .onBackpressureDrop()
      .concatMap(
        l ->
          handler
            .streamMetrics(getMetricsSnapshotStream(), Unpooled.EMPTY_BUFFER)
            .doOnNext(this::recordClockSkew))
      .doOnError(throwable -> logger.debug("error streaming metrics", throwable))
      .retry()
      .subscribe();
 }
}

代码示例来源:origin: netifi-proteus/proteus-java

@Override
public Flux<SimpleResponse> serverStreamingRpc(SimpleRequest message, ByteBuf metadata) {
 String requestMessage = message.getRequestMessage();
 return Flux.interval(Duration.ofMillis(1))
   .publish()
   .refCount()
   .onBackpressureDrop()
   .map(i -> i + " - got message - " + requestMessage)
   .map(s -> SimpleResponse.newBuilder().setResponseMessage(s).build());
}

代码示例来源:origin: org.apache.camel/camel-reactor

synchronized void attach(ReactiveStreamsProducer producer) {
  Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
  if (this.camelProducer != null) {
    throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
  }
  if (this.camelProducer != producer) {
    detach();
    ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
    Flux<Exchange> flux = Flux.create(camelSink::set, FluxSink.OverflowStrategy.IGNORE);
    if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
      // signal item emitted for non-dropped items only
      flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
    } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
      // Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
      // No exception is reported back to the exchanges
      flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
    } else {
      // Default strategy is BUFFER
      flux = flux.onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted);
    }
    flux.subscribe(this.publisher);
    camelProducer = producer;
  }
}

代码示例来源:origin: netifi-proteus/proteus-java

@Override
public Flux<Skew> streamMetrics(Publisher<MetricsSnapshot> messages, ByteBuf metadata) {
 DirectProcessor<Skew> processor = DirectProcessor.create();
 Disposable subscribe =
   Flux.from(messages)
     .limitRate(256, 32)
     .flatMapIterable(MetricsSnapshot::getMetersList)
     .flatMap(
       meter ->
         Flux.fromIterable(meter.getMeasureList())
           .doOnNext(meterMeasurement -> record(meter, meterMeasurement)))
     .doOnComplete(processor::onComplete)
     .doOnError(processor::onError)
     .subscribe();
 Flux.interval(Duration.ofSeconds(metricsSkewInterval))
   .map(l -> Skew.newBuilder().setTimestamp(System.currentTimeMillis()).build())
   .onBackpressureDrop()
   .doFinally(s -> subscribe.dispose())
   .subscribe(processor);
 return processor;
}

相关文章

Flux类方法