reactor.core.publisher.Flux.bufferTimeout()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.9k)|赞(0)|评价(0)|浏览(341)

本文整理了Java中reactor.core.publisher.Flux.bufferTimeout()方法的一些代码示例,展示了Flux.bufferTimeout()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.bufferTimeout()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:bufferTimeout

Flux.bufferTimeout介绍

[英]Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.
[中]将传入值收集到多个列表缓冲区中,每当缓冲区达到最大大小或maxTime Duration过期时,返回的流量将发出这些值。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 *
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
 */
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
  return bufferTimeout(maxSize, maxTime, listSupplier());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Collect incoming values into multiple user-defined {@link Collection} buffers that
 * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
 * size OR the maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param bufferSupplier a {@link Supplier} of the concrete {@link Collection} to use for each buffer
 * @param <C> the {@link Collection} buffer type
 * @return a microbatched {@link Flux} of {@link Collection} delimited by given size or a given period timeout
 */
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) {
  return bufferTimeout(maxSize, maxTime, Schedulers.parallel(),
      bufferSupplier);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
 */
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) {
  return bufferTimeout(maxSize, maxTime, timer, listSupplier());
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .bufferTimeout(5, Duration.ofMillis(2000));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize2() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .bufferTimeout(5, Duration.ofMillis(2000));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow() {
  return Flux.range(1, 6)
        .delayElements(Duration.ofMillis(300))
        .bufferTimeout(5, Duration.ofMillis(100));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  final Flux<List<Integer>> flux = Flux.just(1).bufferTimeout(3, Duration.ofSeconds(1));
  assertThat(flux).isInstanceOf(Scannable.class);
  assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

.groups()
.subscribe(stream -> stream.publishOn(asyncGroup)
             .bufferTimeout(1000 / 8, Duration.ofSeconds(1))
             .subscribe(batch -> {
               for (int j = 0; j < batch.size(); j++) {

代码示例来源:origin: reactor/reactor-core

.groups()
.subscribe(substream -> substream.hide().publishOn(asyncGroup)
               .bufferTimeout(BATCH_SIZE, Duration.ofMillis(TIMEOUT))
               .subscribe(items -> {
                 batchesDistribution.compute(items.size(),

代码示例来源:origin: reactor/reactor-core

@Test
public void rejectedOnNextLeadsToOnError() {
  Scheduler scheduler = Schedulers.newSingle("rejectedOnNextLeadsToOnError");
  scheduler.dispose();
  StepVerifier.create(Flux.just(1, 2, 3)
              .bufferTimeout(4, Duration.ofMillis(500), scheduler))
        .expectError(RejectedExecutionException.class)
        .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnTimerRejected() {
  Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
  StepVerifier.create(Flux.just(1, 2, 3)
              .doOnNext(n -> scheduler.dispose())
              .bufferTimeout(10, Duration.ofMillis(100), scheduler))
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))
        .verifyThenAssertThat()
        .hasDiscardedExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void discardOnError() {
    StepVerifier.create(Flux.just(1, 2, 3)
                .concatWith(Mono.error(new IllegalStateException("boom")))
                .bufferTimeout(10, Duration.ofMillis(100)))
          .expectErrorMessage("boom")
          .verifyThenAssertThat()
          .hasDiscardedExactly(1, 2, 3);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .bufferTimeout(10, Duration.ofMillis(100)))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldCorrectlyDispatchBatchedTimeout() throws InterruptedException {
  long timeout = 100;
  final int batchsize = 4;
  int parallelStreams = 16;
  CountDownLatch latch = new CountDownLatch(1);
  final EmitterProcessor<Integer> streamBatcher = EmitterProcessor.create();
  streamBatcher.publishOn(asyncGroup)
         .bufferTimeout(batchsize, Duration.ofSeconds(timeout))
         .log("batched")
         .parallel(parallelStreams)
         .groups()
         .log("batched-inner")
         .subscribe(innerStream -> innerStream.publishOn(asyncGroup)
                          .doOnError(Throwable::printStackTrace)
                          .subscribe(i -> latch.countDown()));
  streamBatcher.onNext(12);
  streamBatcher.onNext(123);
  streamBatcher.onNext(42);
  streamBatcher.onNext(666);
  boolean finished = latch.await(2, TimeUnit.SECONDS);
  if (!finished) {
    throw new RuntimeException(latch.getCount()+"");
  }
  else {
    assertEquals("Must have correct latch number : " + latch.getCount(), latch.getCount(), 0);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnFlushWithoutRequest() {
  TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW);
  StepVerifier.create(testPublisher
          .flux()
          .bufferTimeout(10, Duration.ofMillis(200)),
      StepVerifierOptions.create().initialRequest(0))
        .then(() -> testPublisher.emit(1, 2, 3))
        .thenAwait(Duration.ofMillis(250))
        .expectErrorMatches(Exceptions::isOverflow)
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Override
  public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
    Flux<String> otherStream = Flux.just("test", "test2", "test3");
//        System.out.println("Providing new downstream");
    FluxProcessor<Long, Long> p =
        WorkQueueProcessor.<Long>builder().name("fluxion-raw-fork").bufferSize(bufferSize).build();

    cumulated.set(0);
    cumulatedJoin.set(0);

    BiFunction<Long, String, Long> combinator = (t1, t2) -> t1;
    return FluxProcessor.wrap(p,
        p.groupBy(k -> k % 2 == 0)
         .flatMap(stream -> stream.scan((prev, next) -> next)
                     .map(integer -> -integer)
                     .filter(integer -> integer <= 0)
                     .map(integer -> -integer)
                     .bufferTimeout(1024, Duration.ofMillis(50))
                     .flatMap(Flux::fromIterable)
                     .doOnNext(array -> cumulated.getAndIncrement())
                     .flatMap(i -> Flux.zip(Flux.just(i),
                         otherStream,
                         combinator)))
         .doOnNext(array -> cumulatedJoin.getAndIncrement())
         .subscribeWith(TopicProcessor.<Long>builder().name("fluxion-raw-join").bufferSize(bufferSize).build())
         .doOnError(Throwable::printStackTrace));
  }

代码示例来源:origin: reactor/reactor-core

@Override
  Flux<Integer> transformFlux(Flux<Integer> f) {
    Flux<String> otherStream = Flux.just("test", "test2", "test3");
//        System.out.println("Providing new downstream");

    Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);

    BiFunction<Integer, String, Integer> combinator = (t1, t2) -> t1;

    return f.publishOn(sharedGroup)
        .parallel(2)
        .groups()
        .flatMap(stream -> stream.publishOn(asyncGroup)
                     .doOnNext(this::monitorThreadUse)
                     .scan((prev, next) -> next)
                     .map(integer -> -integer)
                     .filter(integer -> integer <= 0)
                     .map(integer -> -integer)
                     .bufferTimeout(batch, Duration.ofMillis(50))
                     .flatMap(Flux::fromIterable)
                     .flatMap(i -> Flux.zip(Flux.just(i), otherStream, combinator))
         )
        .publishOn(sharedGroup)
        .doAfterTerminate(asyncGroup::dispose)
        .doOnError(Throwable::printStackTrace);
  }

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 *
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
 */
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
  return bufferTimeout(maxSize, maxTime, listSupplier());
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Collect incoming values into multiple {@link List} buffers that will be emitted
 * by the returned {@link Flux} each time the buffer reaches a maximum size OR the
 * maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
 */
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) {
  return bufferTimeout(maxSize, maxTime, timer, listSupplier());
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Collect incoming values into multiple user-defined {@link Collection} buffers that
 * will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
 * size OR the maxTime {@link Duration} elapses.
 * <p>
 * <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
 *
 * @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
 *
 * @param maxSize the max collected size
 * @param maxTime the timeout enforcing the release of a partial buffer
 * @param bufferSupplier a {@link Supplier} of the concrete {@link Collection} to use for each buffer
 * @param <C> the {@link Collection} buffer type
 * @return a microbatched {@link Flux} of {@link Collection} delimited by given size or a given period timeout
 */
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) {
  return bufferTimeout(maxSize, maxTime, Schedulers.parallel(),
      bufferSupplier);
}

相关文章

Flux类方法