本文整理了Java中reactor.core.publisher.Flux.delaySubscription()
方法的一些代码示例,展示了Flux.delaySubscription()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.delaySubscription()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:delaySubscription
[英]Delay the Flux#subscribe(Subscriber) to this Flux source until the given period elapses. The delay is introduced through the Schedulers#parallel() default Scheduler.
[中]延迟流量#订阅(订户)到此流量源,直到给定的周期过去。延迟是通过调度程序#parallel()默认调度程序引入的。
代码示例来源:origin: reactor/reactor-core
/**
* Delay the {@link Flux#subscribe(Subscriber) subscription} to this {@link Flux} source until the given
* period elapses, as measured on the user-provided {@link Scheduler}.
*
* <p>
* <img class="marble" src="doc-files/marbles/delaySubscriptionForFlux.svg" alt="">
*
* @param delay {@link Duration} before subscribing this {@link Flux}
* @param timer a time-capable {@link Scheduler} instance to run on
*
* @return a delayed {@link Flux}
*/
public final Flux<T> delaySubscription(Duration delay, Scheduler timer) {
return delaySubscription(Mono.delay(delay, timer));
}
代码示例来源:origin: reactor/reactor-core
/**
* Delay the {@link Flux#subscribe(Subscriber) subscription} to this {@link Flux} source until the given
* period elapses. The delay is introduced through the {@link Schedulers#parallel() parallel} default Scheduler.
*
* <p>
* <img class="marble" src="doc-files/marbles/delaySubscriptionForFlux.svg" alt="">
*
* @param delay duration before subscribing this {@link Flux}
*
* @return a delayed {@link Flux}
*
*/
public final Flux<T> delaySubscription(Duration delay) {
return delaySubscription(delay, Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
Flux<Integer> scenario_delayedTrigger(){
return Flux.just(1)
.delaySubscription(Duration.ofSeconds(3));
}
代码示例来源:origin: reactor/reactor-core
Flux<Integer> scenario_delayedTrigger2(){
return Flux.just(1)
.delaySubscription(Duration.ofMillis(50));
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void otherNull() {
Flux.never().delaySubscription((Publisher<?>)null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowWithTimeoutStartsTimerOnSubscription() {
StepVerifier.withVirtualTime(() ->
Mono.delay(Duration.ofMillis(300))
.thenMany(Flux.range(1, 3))
.delayElements(Duration.ofMillis(150))
.concatWith(Flux.range(4, 10).delaySubscription(Duration.ofMillis(500)))
.windowTimeout(10, Duration.ofMillis(500))
.flatMap(Flux::collectList)
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(100))
.assertNext(l -> assertThat(l).containsExactly(1))
.assertNext(l -> assertThat(l).containsExactly(2, 3))
.assertNext(l -> assertThat(l).containsExactly(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
.assertNext(l -> assertThat(l).isEmpty())
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testSubmitSession() throws Exception {
FluxProcessor<Integer, Integer> processor = EmitterProcessor.create();
AtomicInteger count = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.parallel();
processor.publishOn(scheduler)
.delaySubscription(Duration.ofMillis(1000))
.limitRate(1)
.subscribe(d -> {
count.incrementAndGet();
latch.countDown();
});
FluxSink<Integer> session = processor.sink();
session.next(1);
//System.out.println(emission);
session.complete();
latch.await(5, TimeUnit.SECONDS);
Assert.assertTrue("latch : " + count, count.get() == 1);
scheduler.dispose();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldSendOnErrorSignalWithDelaySubscription() {
Signal<? extends Long>[] first = new Signal[1];
RuntimeException error = new RuntimeException();
StepVerifier.create(Flux.<Long>error(error)
.switchOnFirst((s, f) -> {
first[0] = s;
return f.delaySubscription(Duration.ofMillis(100));
}))
.expectSubscription()
.expectError(RuntimeException.class)
.verify();
Assertions.assertThat(first).containsExactly(Signal.error(error));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void timeoutDropWhenNoCancelWithoutFallback() {
for (int i = 0; i < 50; i++) {
StepVerifier.withVirtualTime(
() -> Flux.just("cat")
.delaySubscription(Duration.ofMillis(3))
// We cancel on another scheduler that won't do anything to force it to act like
// the event is already in flight
.cancelOn(Schedulers.fromExecutor(r -> {}))
.timeout(Duration.ofMillis(2))
)
.thenAwait(Duration.ofSeconds(5))
.expectError(TimeoutException.class)
.verify();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldSendOnCompleteSignalWithDelaySubscription() {
Signal<? extends Long>[] first = new Signal[1];
StepVerifier.create(Flux.<Long>empty()
.switchOnFirst((s, f) -> {
first[0] = s;
return f.delaySubscription(Duration.ofMillis(100));
}))
.expectSubscription()
.expectComplete()
.verify();
Assertions.assertThat(first).containsExactly(Signal.complete());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void timeoutDropWhenNoCancelWithFallback() {
for (int i = 0; i < 50; i++) {
StepVerifier.withVirtualTime(
() -> Flux.just("cat")
.delaySubscription(Duration.ofMillis(3))
// We cancel on another scheduler that won't do anything to force it to act like
// the event is already in flight
.cancelOn(Schedulers.fromExecutor(r -> {}))
.timeout(Duration.ofMillis(2), Flux.just("dog").delayElements(Duration.ofMillis(5)))
)
.thenAwait(Duration.ofSeconds(5))
.expectNext("dog")
.expectComplete()
.verify();
}
}
代码示例来源:origin: reactor/reactor-core
.delaySubscription(Duration.ofMillis(1L))
.log("streamed")
.map(it -> it * 2)
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyTrigger() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.delaySubscription(Flux.empty())
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void neverTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.delaySubscription(Flux.never())
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.delaySubscription(Flux.just(1))
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void manyTriggered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.delaySubscription(Flux.range(1, 10))
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeOnComplete() {
StepVerifier.withVirtualTime(() -> Flux.empty()
.delaySubscription(Duration.ofHours(1))
.log())
.thenAwait(Duration.ofHours(1))
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyTriggerBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.delaySubscription(Flux.empty())
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1, 2)
.assertNotComplete()
.assertNoError();
ts.request(5);
ts.assertValues(1, 2, 3, 4, 5, 6, 7)
.assertNotComplete()
.assertNoError();
ts.request(10);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void manyTriggeredBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.delaySubscription(Flux.range(1, 10))
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1, 2)
.assertNotComplete()
.assertNoError();
ts.request(5);
ts.assertValues(1, 2, 3, 4, 5, 6, 7)
.assertNotComplete()
.assertNoError();
ts.request(10);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.delaySubscription(Flux.just(1))
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1, 2)
.assertNotComplete()
.assertNoError();
ts.request(5);
ts.assertValues(1, 2, 3, 4, 5, 6, 7)
.assertNotComplete()
.assertNoError();
ts.request(10);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!