reactor.core.publisher.Flux.cancelOn()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(3.1k)|赞(0)|评价(0)|浏览(236)

本文整理了Java中reactor.core.publisher.Flux.cancelOn()方法的一些代码示例,展示了Flux.cancelOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.cancelOn()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:cancelOn

Flux.cancelOn介绍

[英]Prepare this Flux so that subscribers will cancel from it on a specified Scheduler.
[中]准备此流量,以便订阅者在指定的计划程序上取消此流量。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelOn() throws Exception {
  CountDownLatch countDownLatch = new CountDownLatch(1);
  AtomicReference<Thread> thread = new AtomicReference<>();
  Disposable res = Flux.never()
             .doOnCancel(() -> {
             thread.set(Thread.currentThread());
             countDownLatch.countDown();
           })
             .cancelOn(asyncGroup)
             .subscribe();
  res.dispose();
  assertTrue(countDownLatch.await(3, TimeUnit.SECONDS));
  assertTrue(thread.get() != Thread.currentThread());
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 3000L)
public void cancelOnDedicatedScheduler() throws Exception {
  CountDownLatch latch = new CountDownLatch(1);
  AtomicReference<Thread> threadHash = new AtomicReference<>(Thread.currentThread());
  Schedulers.single().schedule(() -> threadHash.set(Thread.currentThread()));
  Flux.create(sink -> {
    sink.onDispose(() -> {
      if (threadHash.compareAndSet(Thread.currentThread(), null)) {
        latch.countDown();
      }
    });
  })
    .cancelOn(Schedulers.single())
    .subscribeWith(AssertSubscriber.create())
    .cancel();
  latch.await();
  Assert.assertNull(threadHash.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  final Flux<Integer> flux = Flux.just(1).cancelOn(Schedulers.elastic());
  assertThat(flux).isInstanceOf(Scannable.class);
  assertThat(((Scannable) flux).scan(Scannable.Attr.RUN_ON)).isSameAs(Schedulers.elastic());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void timeoutDropWhenNoCancelWithoutFallback() {
  for (int i = 0; i < 50; i++) {
    StepVerifier.withVirtualTime(
        () -> Flux.just("cat")
             .delaySubscription(Duration.ofMillis(3))
             // We cancel on another scheduler that won't do anything to force it to act like
             // the event is already in flight
             .cancelOn(Schedulers.fromExecutor(r -> {}))
             .timeout(Duration.ofMillis(2))
    )
          .thenAwait(Duration.ofSeconds(5))
          .expectError(TimeoutException.class)
          .verify();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void timeoutDropWhenNoCancelWithFallback() {
  for (int i = 0; i < 50; i++) {
    StepVerifier.withVirtualTime(
        () -> Flux.just("cat")
             .delaySubscription(Duration.ofMillis(3))
             // We cancel on another scheduler that won't do anything to force it to act like
             // the event is already in flight
             .cancelOn(Schedulers.fromExecutor(r -> {}))
             .timeout(Duration.ofMillis(2), Flux.just("dog").delayElements(Duration.ofMillis(5)))
    )
          .thenAwait(Duration.ofSeconds(5))
          .expectNext("dog")
          .expectComplete()
          .verify();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void prematureCancel2() {
  StepVerifier.create(Flux.range(1, 10000)
              .flatMap(Flux::just, 2)
              .cancelOn(Schedulers.single()), 1)
        .expectNext(1)
        .thenRequest(2)
        .expectNext(2, 3)
        .thenCancel()
        .verify();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param scheduler
 * @return
 * @see reactor.core.publisher.Flux#cancelOn(reactor.core.scheduler.Scheduler)
 */
public final Flux<T> cancelOn(Scheduler scheduler) {
  return boxed.cancelOn(scheduler);
}
/**

相关文章

Flux类方法