reactor.core.publisher.Flux.publishOn()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(391)

本文整理了Java中reactor.core.publisher.Flux.publishOn()方法的一些代码示例,展示了Flux.publishOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.publishOn()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:publishOn

Flux.publishOn介绍

[英]Run onNext, onComplete and onError on a supplied SchedulerWorker.

This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.

Typically used for fast publisher, slow consumer(s) scenarios.

flux.publishOn(Schedulers.single()).subscribe()

[中]在提供的SchedulerWorker上运行onNext、onComplete和onError。
此操作符影响线程上下文,在线程上下文中,它下面的链中的其他操作符将执行线程上下文,直到出现新的publishOn。
通常用于快速发布、慢速消费场景。

flux.publishOn(Schedulers.single()).subscribe()

代码示例

代码示例来源:origin: resilience4j/resilience4j

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  source.publishOn(scheduler)
      .subscribe(new BulkheadSubscriber<>(bulkhead, actual));
}

代码示例来源:origin: resilience4j/resilience4j

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  source.publishOn(scheduler)
      .subscribe(new RateLimiterSubscriber<>(rateLimiter, actual));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
    return response.writeWith(Flux.just("h", "e", "l", "l", "o")
                  .delayElements(Duration.ofMillis(100))
                  .publishOn(asyncGroup)
        .collect(dataBufferFactory::allocateBuffer, (buffer, str) -> buffer.write(str.getBytes())));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = RuntimeException.class)
public void blockingFirstError() {
  Flux.error(new RuntimeException("test"))
    .publishOn(scheduler)
    .blockFirst();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subscribeOnDispatchOn() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(100);
  Flux.range(1, 100)
      .log("testOn", Level.FINE)
      .subscribeOn(ioGroup)
      .publishOn(asyncGroup)
      .limitRate(1)
      .subscribe(t -> latch.countDown());
  assertThat("Not totally dispatched", latch.await(30, TimeUnit.SECONDS));
}
@Test

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingLast() {
  Assert.assertEquals((Integer) 10,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockLast());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockingFirst() {
  Assert.assertEquals((Integer) 1,
      Flux.range(1, 10)
        .publishOn(scheduler)
        .blockFirst());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void classicJust() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publishOnFilter() throws Exception {
  Flux<Long> flux = Flux.interval(Duration.ofMillis(2)).take(255)
             .publishOn(scheduler)
             .filter(t -> true)
             .doOnNext(i -> onNext(i))
             .doOnError(e -> onError(e));
  verifyRejectedExecutionConsistency(flux, 5);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncSourceWithNull() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1, null, 1)
    .publishOn(Schedulers.fromExecutorService(exec))
    .subscribe(ts);
  ts.await(Duration.ofSeconds(5));
  ts.assertValues(1)
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subscribeWithAsyncFusion() {
  Processor<Integer, Integer> processor = EmitterProcessor.create(16);
  StepVerifier.create(processor)
        .then(() -> Flux.range(1, 5).publishOn(Schedulers.elastic()).subscribe(processor))
        .expectNext(1, 2, 3, 4, 5)
        .expectComplete()
        .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRange() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .flatMap(v -> Flux.range(v, 2), false, 128, 1)
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeMax() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .flatMap(v -> Flux.range(v, 2), false, 128, 32)
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void expectNextAsync() {
  Flux<String> flux = Flux.just("foo", "bar")
              .publishOn(Schedulers.parallel());
  StepVerifier.create(flux)
        .expectNext("foo")
        .expectNext("bar")
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyFusionModeExpected2() {
  Flux<String> flux = Flux.just("foo", "bar")
              .publishOn(Schedulers.immediate());
  StepVerifier.create(flux)
        .expectFusion(Fuseable.SYNC | Fuseable.ASYNC, Fuseable.ASYNC)
        .expectNext("foo", "bar")
        .expectComplete()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeMaxUnbounded() {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .flatMap(v -> Flux.range(v, 2))
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardPollAsyncPredicateMiss() {
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
              .publishOn(Schedulers.newSingle("discardPollAsync"))
              .filter(i -> i % 2 == 0)
  )
        .expectFusion(Fuseable.ASYNC)
        .expectNextCount(5)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 3, 5, 7, 9);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateError2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.ERROR).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateIgnore2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.IGNORE).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void crossRangeMaxHidden() throws Exception {
  int count = 1000000;
  StepVerifier.create(Flux.range(1, count)
              .hide()
              .flatMap(v -> Flux.range(v, 2)
                       .hide(), false, 4, 32)
              .hide()
              .publishOn(Schedulers.fromExecutorService(exec)))
        .expectNextCount(2 * count)
        .verifyComplete();
}

相关文章

Flux类方法