本文整理了Java中reactor.core.publisher.Flux.buffer()
方法的一些代码示例,展示了Flux.buffer()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.buffer()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:buffer
[英]Collect all incoming values into a single List buffer that will be emitted by the returned Flux once this Flux completes.
[中]将所有传入值收集到单个列表缓冲区中,该缓冲区将在该流量完成后由返回的流量发出。
代码示例来源:origin: reactor/reactor-core
@Test
public void bufferWillSubdivideAnInputFlux() {
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
//"non overlapping buffers"
List<List<Integer>> res = numbers.buffer(2, 3)
.buffer()
.blockLast();
assertThat(res).containsExactly(Arrays.asList(1, 2),
Arrays.asList(4, 5),
Arrays.asList(7, 8));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWillSubdivideAnInputFluxGapTime() {
return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.delayElements(Duration.ofMillis(99))
.buffer(Duration.ofMillis(200), Duration.ofMillis(300));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWillSubdivideAnInputFluxTime() {
return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.delayElements(Duration.ofMillis(99))
.buffer(Duration.ofMillis(200));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWillSubdivideAnInputFluxTime2() {
return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
.delayElements(Duration.ofMillis(99))
.buffer(Duration.ofMillis(200));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_delayItems() {
return Flux.range(1, 4)
.buffer(2)
.delayElements(Duration.ofMillis(1000));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxEmptyBuffer() {
// "flux empty buffer"
// when:
List<List<Object>> ranges = Flux.empty()
.buffer(Flux.never())
.collectList()
.block(Duration.ofMillis(100));
// then:
assertThat(ranges).isEmpty();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxEmptyBufferJust() {
// "flux empty buffer just"() {
// when:
List<List<Object>> ranges = Flux.empty()
.buffer(Flux.just(1))
.collectList()
.block();
// then:
assertThat(ranges).isEmpty();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, List<String>>> scenarios_operatorError() {
return Arrays.asList(
scenario(f -> f.buffer(Flux.never(), () -> null)),
scenario(f -> f.buffer(Flux.never(), () -> {
throw exception();
})));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalExact() {
AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
Flux.range(1, 10).buffer(2).subscribe(ts);
ts.assertValues(Arrays.asList(1, 2),
Arrays.asList(3, 4),
Arrays.asList(5, 6),
Arrays.asList(7, 8),
Arrays.asList(9, 10))
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, List<String>>> scenarios_operatorSuccess() {
return Arrays.asList(scenario(f -> f.buffer(Mono.never()))
.receive(i -> assertThat(i).containsExactly(item(0), item(1), item(2)))
.shouldAssertPostTerminateState(false),
scenario(f -> f.buffer(Mono.just(1)))
.receiverEmpty()
.shouldAssertPostTerminateState(false)
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
ParallelFlux<List<Integer>> source = Flux.just(500, 300).buffer(1).parallel(10);
ParallelMergeSort<Integer> test = new ParallelMergeSort<>(source, Integer::compareTo);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnSkippedElements() {
//the skip flavor should discard elements that are not added to any buffer
StepVerifier.create(Flux.just(1, 2, 3, 4, 5)
.buffer(2, 3)
.flatMapIterable(Function.identity()))
.expectNext(1, 2, 4, 5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.buffer(4))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnErrorOverlap() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.buffer(4, 2))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void supplierThrows() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 10).buffer(2, 1, () -> {
throw new RuntimeException("forced failure");
}).subscribe(ts);
ts.assertNoValues()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure")
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.never())
.buffer(4))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.error(new IllegalStateException("boom")))
.buffer(Mono.never()))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatWith(Mono.never())
.buffer(Mono.never()))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void supplierReturnsNull() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 10).buffer(2, 1, () -> null).subscribe(ts);
ts.assertNoValues()
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnCancelOverlap() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6)
.limitRequest(2)
.concatWith(Mono.never())
.buffer(4, 2))
.thenAwait(Duration.ofMillis(10))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2);
}
内容来源于网络,如有侵权,请联系作者删除!