本文整理了Java中reactor.core.publisher.Flux.range()
方法的一些代码示例,展示了Flux.range()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.range()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:range
[英]Build a Flux that will only emit a sequence of count incrementing integers, starting from start. That is, emit integers between start (included) and start + count (excluded) then complete.
[中]构建一个通量,该通量将只从开始发出一个计数递增整数序列。也就是说,在开始(包括)和开始+计数(排除)之间发出整数,然后完成。
代码示例来源:origin: spring-projects/spring-framework
@GetMapping("/SPR-16051")
public Flux<String> errors() {
return Flux.range(1, 10000)
.map(i -> {
if (i == 1000) {
throw new RuntimeException("Random error");
}
return i + ". foo bar";
});
}
代码示例来源:origin: spring-projects/spring-framework
private Publisher<DataBuffer> multipleChunks() {
int chunkSize = RESPONSE_SIZE / CHUNKS;
return Flux.range(1, CHUNKS).map(integer -> randomBuffer(chunkSize));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.bufferTimeout(5, Duration.ofMillis(2000));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWithTimeoutAccumulateOnTimeOrSize2() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.bufferTimeout(5, Duration.ofMillis(2000));
}
代码示例来源:origin: reactor/reactor-core
Flux<List<Integer>> scenario_bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow() {
return Flux.range(1, 6)
.delayElements(Duration.ofMillis(300))
.bufferTimeout(5, Duration.ofMillis(100));
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void normal2() {
Queue<Integer> q = new ArrayBlockingQueue<>(1);
List<Integer> values = new ArrayList<>();
for (Integer i : Flux.range(1, 10)
.toIterable(1, () -> q)) {
values.add(i);
}
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void toStream() {
List<Integer> values = new ArrayList<>();
Flux.range(1, 10)
.toStream()
.forEach(values::add);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void echo() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
this.client.execute(getUrl("/echo"), session -> session
.send(input.map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.then())
.block(TIMEOUT);
assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void largerSkip() {
AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
Flux.range(1, 10).buffer(2, 3).subscribe(ts);
ts.assertValues(Arrays.asList(1, 2),
Arrays.asList(4, 5),
Arrays.asList(7, 8),
Arrays.asList(10))
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperatorNullTags() throws Exception {
Flux<Integer> source = Flux.range(1, 4);
FluxNameFuseable<Integer> test = new FluxNameFuseable<>(source, "foo", null);
assertThat(test.scan(Scannable.Attr.TAGS)).isNull();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void largerSkipEven() {
AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
Flux.range(1, 8).buffer(2, 3).subscribe(ts);
ts.assertValues(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8))
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal1() {
StepVerifier.create(Flux.range(1, 5)
.onBackpressureBuffer(Duration.ofMinutes(1), Integer.MAX_VALUE, v -> {}))
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void bufferLimit() {
StepVerifier.create(Flux.range(1, 5)
.onBackpressureBuffer(Duration.ofMinutes(1), 1, this, Schedulers.single()),
0)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(100))
.thenRequest(1)
.expectNext(5)
.verifyComplete();
assertThat(evicted).containsExactly(1, 2, 3, 4);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchMaxTranslatesToUnboundedRequest() {
AtomicLong requested = new AtomicLong();
StepVerifier.create(Flux.just(1, 2, 3).hide()
.doOnRequest(requested::set)
.concatMap(i -> Flux.range(0, i), Integer.MAX_VALUE))
.expectNext(0, 0, 1, 0, 1, 2)
.verifyComplete();
assertThat(requested.get())
.isNotEqualTo(Integer.MAX_VALUE)
.isEqualTo(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void namedHideFluxTest() {
Flux<Integer> named1 =
Flux.range(1, 10)
.hide()
.name("100s");
Flux<Integer> named2 = named1.filter(i -> i % 3 == 0)
.name("multiple of 3 100s")
.hide();
assertThat(Scannable.from(named1).name()).isEqualTo("100s");
assertThat(Scannable.from(named2).name()).isEqualTo("multiple of 3 100s");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void blockingLast() {
Assert.assertEquals((Integer) 10,
Flux.range(1, 10)
.publishOn(scheduler)
.blockLast());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBoundary2() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 2)
.hide()
.concatMapDelayError(v -> Flux.range(v, 2))
.subscribe(ts);
ts.assertValues(1, 2, 2, 3)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expectNextSequenceWithPartialMatchingSequenceNoMoreExpectation() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(Flux.range(1, 5))
.expectNextSequence(Arrays.asList(1, 2, 3))
.verifyComplete())
.withMessage("expectation \"expectComplete\" failed (expected: onComplete(); actual: onNext(4))");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void requestTrackingDisabledIfNotNamed() {
Flux<Integer> source = Flux.range(1, 10)
.hide();
new FluxMetrics<>(source, registry)
.blockLast();
DistributionSummary meter = registry.find(METER_REQUESTED)
.summary();
if (meter != null) { //meter could be null in some tests
assertThat(meter.count()).isZero();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void neverTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.delaySubscription(Flux.never())
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
}
内容来源于网络,如有侵权,请联系作者删除!