本文整理了Java中reactor.core.publisher.Flux.cancelOn()
方法的一些代码示例,展示了Flux.cancelOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.cancelOn()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:cancelOn
[英]Prepare this Flux so that subscribers will cancel from it on a specified Scheduler.
[中]准备此流量,以便订阅者在指定的计划程序上取消此流量。
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelOn() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Thread> thread = new AtomicReference<>();
Disposable res = Flux.never()
.doOnCancel(() -> {
thread.set(Thread.currentThread());
countDownLatch.countDown();
})
.cancelOn(asyncGroup)
.subscribe();
res.dispose();
assertTrue(countDownLatch.await(3, TimeUnit.SECONDS));
assertTrue(thread.get() != Thread.currentThread());
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 3000L)
public void cancelOnDedicatedScheduler() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Thread> threadHash = new AtomicReference<>(Thread.currentThread());
Schedulers.single().schedule(() -> threadHash.set(Thread.currentThread()));
Flux.create(sink -> {
sink.onDispose(() -> {
if (threadHash.compareAndSet(Thread.currentThread(), null)) {
latch.countDown();
}
});
})
.cancelOn(Schedulers.single())
.subscribeWith(AssertSubscriber.create())
.cancel();
latch.await();
Assert.assertNull(threadHash.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
final Flux<Integer> flux = Flux.just(1).cancelOn(Schedulers.elastic());
assertThat(flux).isInstanceOf(Scannable.class);
assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.elastic());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void timeoutDropWhenNoCancelWithoutFallback() {
for (int i = 0; i < 50; i++) {
StepVerifier.withVirtualTime(
() -> Flux.just("cat")
.delaySubscription(Duration.ofMillis(3))
// We cancel on another scheduler that won't do anything to force it to act like
// the event is already in flight
.cancelOn(Schedulers.fromExecutor(r -> {}))
.timeout(Duration.ofMillis(2))
)
.thenAwait(Duration.ofSeconds(5))
.expectError(TimeoutException.class)
.verify();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void timeoutDropWhenNoCancelWithFallback() {
for (int i = 0; i < 50; i++) {
StepVerifier.withVirtualTime(
() -> Flux.just("cat")
.delaySubscription(Duration.ofMillis(3))
// We cancel on another scheduler that won't do anything to force it to act like
// the event is already in flight
.cancelOn(Schedulers.fromExecutor(r -> {}))
.timeout(Duration.ofMillis(2), Flux.just("dog").delayElements(Duration.ofMillis(5)))
)
.thenAwait(Duration.ofSeconds(5))
.expectNext("dog")
.expectComplete()
.verify();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void prematureCancel2() {
StepVerifier.create(Flux.range(1, 10000)
.flatMap(Flux::just, 2)
.cancelOn(Schedulers.single()), 1)
.expectNext(1)
.thenRequest(2)
.expectNext(2, 3)
.thenCancel()
.verify();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param scheduler
* @return
* @see reactor.core.publisher.Flux#cancelOn(reactor.core.scheduler.Scheduler)
*/
public final Flux<T> cancelOn(Scheduler scheduler) {
return boxed.cancelOn(scheduler);
}
/**
内容来源于网络,如有侵权,请联系作者删除!