本文整理了Java中reactor.core.publisher.Flux.publishOn()
方法的一些代码示例,展示了Flux.publishOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.publishOn()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:publishOn
[英]Run onNext, onComplete and onError on a supplied SchedulerWorker.
This operator influences the threading context where the rest of the operators in the chain below it will execute, up to a new occurrence of publishOn.
Typically used for fast publisher, slow consumer(s) scenarios.
flux.publishOn(Schedulers.single()).subscribe()
[中]在提供的SchedulerWorker上运行onNext、onComplete和onError。
此操作符影响线程上下文,在线程上下文中,它下面的链中的其他操作符将执行线程上下文,直到出现新的publishOn。
通常用于快速发布、慢速消费场景。
flux.publishOn(Schedulers.single()).subscribe()
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new BulkheadSubscriber<>(bulkhead, actual));
}
代码示例来源:origin: resilience4j/resilience4j
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new RateLimiterSubscriber<>(rateLimiter, actual));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(Flux.just("h", "e", "l", "l", "o")
.delayElements(Duration.ofMillis(100))
.publishOn(asyncGroup)
.collect(dataBufferFactory::allocateBuffer, (buffer, str) -> buffer.write(str.getBytes())));
}
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = RuntimeException.class)
public void blockingFirstError() {
Flux.error(new RuntimeException("test"))
.publishOn(scheduler)
.blockFirst();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void subscribeOnDispatchOn() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(100);
Flux.range(1, 100)
.log("testOn", Level.FINE)
.subscribeOn(ioGroup)
.publishOn(asyncGroup)
.limitRate(1)
.subscribe(t -> latch.countDown());
assertThat("Not totally dispatched", latch.await(30, TimeUnit.SECONDS));
}
@Test
代码示例来源:origin: reactor/reactor-core
@Test
public void blockingLast() {
Assert.assertEquals((Integer) 10,
Flux.range(1, 10)
.publishOn(scheduler)
.blockLast());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void blockingFirst() {
Assert.assertEquals((Integer) 1,
Flux.range(1, 10)
.publishOn(scheduler)
.blockFirst());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void classicJust() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publishOnFilter() throws Exception {
Flux<Long> flux = Flux.interval(Duration.ofMillis(2)).take(255)
.publishOn(scheduler)
.filter(t -> true)
.doOnNext(i -> onNext(i))
.doOnError(e -> onError(e));
verifyRejectedExecutionConsistency(flux, 5);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncSourceWithNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, null, 1)
.publishOn(Schedulers.fromExecutorService(exec))
.subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertValues(1)
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void subscribeWithAsyncFusion() {
Processor<Integer, Integer> processor = EmitterProcessor.create(16);
StepVerifier.create(processor)
.then(() -> Flux.range(1, 5).publishOn(Schedulers.elastic()).subscribe(processor))
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify(Duration.ofSeconds(1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRange() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.flatMap(v -> Flux.range(v, 2), false, 128, 1)
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeMax() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.flatMap(v -> Flux.range(v, 2), false, 128, 32)
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expectNextAsync() {
Flux<String> flux = Flux.just("foo", "bar")
.publishOn(Schedulers.parallel());
StepVerifier.create(flux)
.expectNext("foo")
.expectNext("bar")
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyFusionModeExpected2() {
Flux<String> flux = Flux.just("foo", "bar")
.publishOn(Schedulers.immediate());
StepVerifier.create(flux)
.expectFusion(Fuseable.SYNC | Fuseable.ASYNC, Fuseable.ASYNC)
.expectNext("foo", "bar")
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeMaxUnbounded() {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.flatMap(v -> Flux.range(v, 2))
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardPollAsyncPredicateMiss() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
.publishOn(Schedulers.newSingle("discardPollAsync"))
.filter(i -> i % 2 == 0)
)
.expectFusion(Fuseable.ASYNC)
.expectNextCount(5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 3, 5, 7, 9);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateError2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.ERROR).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateIgnore2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.IGNORE).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void crossRangeMaxHidden() throws Exception {
int count = 1000000;
StepVerifier.create(Flux.range(1, count)
.hide()
.flatMap(v -> Flux.range(v, 2)
.hide(), false, 4, 32)
.hide()
.publishOn(Schedulers.fromExecutorService(exec)))
.expectNextCount(2 * count)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!