本文整理了Java中reactor.core.publisher.Flux.bufferTimeout()
方法的一些代码示例,展示了Flux.bufferTimeout()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.bufferTimeout()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:bufferTimeout
[英]Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.
[中]将传入值收集到多个列表缓冲区中,每当缓冲区达到最大大小或maxTime Duration过期时,返回的流量将发出这些值。
代码示例来源:origin: reactor/reactor-core
/**
* Collect incoming values into multiple {@link List} buffers that will be emitted
* by the returned {@link Flux} each time the buffer reaches a maximum size OR the
* maxTime {@link Duration} elapses.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
*
* @param maxSize the max collected size
* @param maxTime the timeout enforcing the release of a partial buffer
*
* @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
*/
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
return bufferTimeout(maxSize, maxTime, listSupplier());
}
代码示例来源:origin: reactor/reactor-core
/**
* Collect incoming values into multiple user-defined {@link Collection} buffers that
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
*
* @param maxSize the max collected size
* @param maxTime the timeout enforcing the release of a partial buffer
* @param bufferSupplier a {@link Supplier} of the concrete {@link Collection} to use for each buffer
* @param <C> the {@link Collection} buffer type
* @return a microbatched {@link Flux} of {@link Collection} delimited by given size or a given period timeout
*/
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) {
return bufferTimeout(maxSize, maxTime, Schedulers.parallel(),
bufferSupplier);
}
代码示例来源:origin: reactor/reactor-core
/**
* Collect incoming values into multiple {@link List} buffers that will be emitted
* by the returned {@link Flux} each time the buffer reaches a maximum size OR the
* maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
*
* @param maxSize the max collected size
* @param maxTime the timeout enforcing the release of a partial buffer
* @param timer a time-capable {@link Scheduler} instance to run on
*
* @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
*/
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) {
return bufferTimeout(maxSize, maxTime, timer, listSupplier());
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.bufferTimeout(5, Duration.ofMillis(2000));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize2() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.bufferTimeout(5, Duration.ofMillis(2000));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.bufferTimeout(5, Duration.ofMillis(100));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
final Flux<List<Integer>> flux = Flux.just(1).bufferTimeout(3, Duration.ofSeconds(1));
assertThat(flux).isInstanceOf(Scannable.class);
assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
.groups()
.subscribe(stream -> stream.publishOn(asyncGroup)
.bufferTimeout(1000 / 8, Duration.ofSeconds(1))
.subscribe(batch -> {
for (int j = 0; j < batch.size(); j++) {
代码示例来源:origin: reactor/reactor-core
.groups()
.subscribe(substream -> substream.hide().publishOn(asyncGroup)
.bufferTimeout(BATCH_SIZE, Duration.ofMillis(TIMEOUT))
.subscribe(items -> {
batchesDistribution.compute(items.size(),
代码示例来源:origin: reactor/reactor-core
@Test
public void rejectedOnNextLeadsToOnError() {
Scheduler scheduler = Schedulers.newSingle("rejectedOnNextLeadsToOnError");
scheduler.dispose();
StepVerifier.create(Flux.just(1, 2, 3)
.bufferTimeout(4, Duration.ofMillis(500), scheduler))
.expectError(RejectedExecutionException.class)
.verify(Duration.ofSeconds(1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnTimerRejected() {
Scheduler scheduler = Schedulers.newSingle("discardOnTimerRejected");
StepVerifier.create(Flux.just(1, 2, 3)
.doOnNext(n -> scheduler.dispose())
.bufferTimeout(10, Duration.ofMillis(100), scheduler))
.expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class))
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.bufferTimeout(10, Duration.ofMillis(100)))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.never())
.bufferTimeout(10, Duration.ofMillis(100)))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldCorrectlyDispatchBatchedTimeout() throws InterruptedException {
long timeout = 100;
final int batchsize = 4;
int parallelStreams = 16;
CountDownLatch latch = new CountDownLatch(1);
final EmitterProcessor<Integer> streamBatcher = EmitterProcessor.create();
streamBatcher.publishOn(asyncGroup)
.bufferTimeout(batchsize, Duration.ofSeconds(timeout))
.log("batched")
.parallel(parallelStreams)
.groups()
.log("batched-inner")
.subscribe(innerStream -> innerStream.publishOn(asyncGroup)
.doOnError(Throwable::printStackTrace)
.subscribe(i -> latch.countDown()));
streamBatcher.onNext(12);
streamBatcher.onNext(123);
streamBatcher.onNext(42);
streamBatcher.onNext(666);
boolean finished = latch.await(2, TimeUnit.SECONDS);
if (!finished) {
throw new RuntimeException(latch.getCount()+"");
}
else {
assertEquals("Must have correct latch number : " + latch.getCount(), latch.getCount(), 0);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnFlushWithoutRequest() {
TestPublisher<Integer> testPublisher = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW);
StepVerifier.create(testPublisher
.flux()
.bufferTimeout(10, Duration.ofMillis(200)),
StepVerifierOptions.create().initialRequest(0))
.then(() -> testPublisher.emit(1, 2, 3))
.thenAwait(Duration.ofMillis(250))
.expectErrorMatches(Exceptions::isOverflow)
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
Flux<String> otherStream = Flux.just("test", "test2", "test3");
// System.out.println("Providing new downstream");
FluxProcessor<Long, Long> p =
WorkQueueProcessor.<Long>builder().name("fluxion-raw-fork").bufferSize(bufferSize).build();
cumulated.set(0);
cumulatedJoin.set(0);
BiFunction<Long, String, Long> combinator = (t1, t2) -> t1;
return FluxProcessor.wrap(p,
p.groupBy(k -> k % 2 == 0)
.flatMap(stream -> stream.scan((prev, next) -> next)
.map(integer -> -integer)
.filter(integer -> integer <= 0)
.map(integer -> -integer)
.bufferTimeout(1024, Duration.ofMillis(50))
.flatMap(Flux::fromIterable)
.doOnNext(array -> cumulated.getAndIncrement())
.flatMap(i -> Flux.zip(Flux.just(i),
otherStream,
combinator)))
.doOnNext(array -> cumulatedJoin.getAndIncrement())
.subscribeWith(TopicProcessor.<Long>builder().name("fluxion-raw-join").bufferSize(bufferSize).build())
.doOnError(Throwable::printStackTrace));
}
代码示例来源:origin: reactor/reactor-core
@Override
Flux<Integer> transformFlux(Flux<Integer> f) {
Flux<String> otherStream = Flux.just("test", "test2", "test3");
// System.out.println("Providing new downstream");
Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);
BiFunction<Integer, String, Integer> combinator = (t1, t2) -> t1;
return f.publishOn(sharedGroup)
.parallel(2)
.groups()
.flatMap(stream -> stream.publishOn(asyncGroup)
.doOnNext(this::monitorThreadUse)
.scan((prev, next) -> next)
.map(integer -> -integer)
.filter(integer -> integer <= 0)
.map(integer -> -integer)
.bufferTimeout(batch, Duration.ofMillis(50))
.flatMap(Flux::fromIterable)
.flatMap(i -> Flux.zip(Flux.just(i), otherStream, combinator))
)
.publishOn(sharedGroup)
.doAfterTerminate(asyncGroup::dispose)
.doOnError(Throwable::printStackTrace);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Collect incoming values into multiple {@link List} buffers that will be emitted
* by the returned {@link Flux} each time the buffer reaches a maximum size OR the
* maxTime {@link Duration} elapses.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
*
* @param maxSize the max collected size
* @param maxTime the timeout enforcing the release of a partial buffer
*
* @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
*/
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
return bufferTimeout(maxSize, maxTime, listSupplier());
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Collect incoming values into multiple {@link List} buffers that will be emitted
* by the returned {@link Flux} each time the buffer reaches a maximum size OR the
* maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
*
* @param maxSize the max collected size
* @param maxTime the timeout enforcing the release of a partial buffer
* @param timer a time-capable {@link Scheduler} instance to run on
*
* @return a microbatched {@link Flux} of {@link List} delimited by given size or a given period timeout
*/
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer) {
return bufferTimeout(maxSize, maxTime, timer, listSupplier());
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Collect incoming values into multiple user-defined {@link Collection} buffers that
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* @reactor.discard This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
*
* @param maxSize the max collected size
* @param maxTime the timeout enforcing the release of a partial buffer
* @param bufferSupplier a {@link Supplier} of the concrete {@link Collection} to use for each buffer
* @param <C> the {@link Collection} buffer type
* @return a microbatched {@link Flux} of {@link Collection} delimited by given size or a given period timeout
*/
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier) {
return bufferTimeout(maxSize, maxTime, Schedulers.parallel(),
bufferSupplier);
}
内容来源于网络,如有侵权,请联系作者删除!