本文整理了Java中reactor.core.publisher.Flux.doOnComplete()
方法的一些代码示例,展示了Flux.doOnComplete()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnComplete()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnComplete
[英]Add behavior (side-effect) triggered when the Flux completes successfully.
[中]添加流量成功完成时触发的行为(副作用)。
代码示例来源:origin: spring-projects/spring-framework
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
if (publisher != null && publisherNested != null) {
throw new IllegalArgumentException("At most one publisher expected");
}
this.publisher = publisher != null ?
Flux.from(publisher)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.doOnNext(this.buffer::write)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
this.content.onComplete();
}
}
代码示例来源:origin: reactor/reactor-core
private void subscribe(Flux<Flux<Integer>> windows) {
mainSubscriber = AssertSubscriber.create();
windows.doOnCancel(() -> mainCancelled.incrementAndGet())
.doOnComplete(() -> mainCompleted.incrementAndGet())
.doOnTerminate(() -> mainTerminated.incrementAndGet()).subscribe(mainSubscriber);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void streamStateRelatedSignalsCanBeConsumed() {
// "Stream "state" related signals can be consumed"
// given: "a composable with values 1 to 5 inclusive"
Flux<Integer> stream = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5));
List<Integer> values = new ArrayList<>();
List<String> signals = new ArrayList<>();
// when: "a Subscribe Consumer is registered"
stream = stream.doOnSubscribe(s -> signals.add("subscribe"));
// and: "a Cancel Consumer is registered"
stream = stream.doOnCancel(() -> signals.add("cancel"));
// and: "a Complete Consumer is registered"
stream = stream.doOnComplete(() -> signals.add("complete"));
// and: "the flux is consumed"
stream.subscribe(values::add);
// then: "the initial values are passed"
assertThat(values).containsExactly(1, 2, 3, 4, 5);
assertThat(signals).containsExactly("subscribe", "complete");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void streamCanEmitDefaultValueIfEmpty() {
// "Stream can emit a default value if empty"
// given: "a composable that only completes"
Flux<String> stream = Flux.empty();
List<String> values = new ArrayList<>();
// when: "a Subscribe Consumer is registered"
stream = stream.defaultIfEmpty("test")
.doOnComplete(() -> values.add("complete"));
// and: "the flux is consumed"
stream.subscribe(values::add);
// then: "the initial values are passed"
assertThat(values).containsExactly("test", "complete");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncPollCompleteCalled() {
AtomicBoolean onComplete = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doOnComplete(() -> onComplete.set(true))
.publish();
StepVerifier.create(f)
.then(f::connect)
.expectNext(1)
.verifyComplete();
assertThat(onComplete.get()).withFailMessage("onComplete not called back").isTrue();
}
代码示例来源:origin: reactor/reactor-core
private void expectWindow(int index, Predicate<? super Integer> innerCancelPredicate, List<Integer> values) {
AssertSubscriber<Integer> s = AssertSubscriber.create();
mainSubscriber.values().get(index)
.doOnCancel(() -> innerCancelled.incrementAndGet())
.doOnComplete(() -> {
innerCompleted.incrementAndGet();})
.doOnTerminate(() -> innerTerminated.incrementAndGet())
.takeWhile(innerCancelPredicate).subscribe(s);
s.assertValueSequence(values).assertNoError();
innerCreated.incrementAndGet();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncPollConditionalCompleteCalled() {
AtomicBoolean onComplete = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doOnComplete(() -> onComplete.set(true))
.filter(v -> true)
.publish();
StepVerifier.create(f)
.then(f::connect)
.expectNext(1)
.verifyComplete();
assertThat(onComplete.get()).withFailMessage("onComplete not called back").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noFusionCompleteCalled() {
AtomicBoolean onComplete = new AtomicBoolean();
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 2)
.doOnComplete(() -> onComplete.set(true))
.subscribe(ts);
ts.assertNoError()
.assertValues(1, 2)
.assertComplete();
Assert.assertTrue("onComplete not called back", onComplete.get());
}
代码示例来源:origin: line/armeria
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
response.setStatusCode(HttpStatus.OK);
return request.getBody().map(data -> {
// skip data, then throw an exception.
throw HttpStatusException.of(com.linecorp.armeria.common.HttpStatus.BAD_REQUEST);
}).doOnComplete(() -> {
// An HTTP GET request doesn't have a body, so onComplete will be immediately called.
throw HttpStatusException.of(com.linecorp.armeria.common.HttpStatus.BAD_REQUEST);
}).then();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncCompleteCalled() {
AtomicBoolean onComplete = new AtomicBoolean();
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 2)
.hide()
.doOnComplete(() -> onComplete.set(true))
.subscribe(ts);
ts.assertNoError()
.assertValues(1, 2)
.assertComplete();
Assert.assertTrue("onComplete not called back", onComplete.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void deferredFluxInitialValueLaterAvailableUpToLongMax() throws InterruptedException {
// "A deferred Flux with an initial value makes that value available later up to Long.MAX "
// given: "a composable with an initial value"
AtomicReference<Throwable> e = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Flux<Integer> stream = Flux.fromIterable(Arrays.asList(1, 2, 3))
.publish()
.autoConnect()
.doOnError(e::set)
.doOnComplete(latch::countDown);
// when: "cumulated request of Long MAX"
long test = Long.MAX_VALUE / 2L;
AssertSubscriber<Integer> controls =
stream.subscribeWith(AssertSubscriber.create(0));
controls.request(test);
controls.request(test);
controls.request(1);
//sleep(2000)
// then: "no error available"
latch.await(2, TimeUnit.SECONDS);
assertThat(e.get()).isNull();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void flatMapSignal2() {
StepVerifier.create(Mono.just(1)
.flatMapMany(d -> Flux.just(d * 2),
e -> Flux.just(99),
() -> Flux.just(10)).doOnComplete(() -> {
System.out.println("test");
})
.log())
.expectNext(2, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
.doOnComplete(() -> {
if (ref.get() != null) {
Scannable t = ref.get();
代码示例来源:origin: reactor/reactor-core
.doOnNext(i -> processor.onNext(retainedDetector.tracked(new Wrapper(i))))
.doOnError(processor::onError)
.doOnComplete(processor::onComplete);
retainedDetector.finalizedCount()))
.doOnNext(v -> LOGGER.debug(v.toString()))
.doOnComplete(latch::countDown)
.collectList();
代码示例来源:origin: reactor/reactor-core
.delayElements(Duration.ofMillis(10))
.doOnNext(i -> processor.onNext(finalizedTracker.tracked(new Wrapper(i))))
.doOnComplete(processor::onComplete);
.doOnComplete(latch::countDown)
.collectList();
代码示例来源:origin: reactor/reactor-core
@Test
public void completeCallbackError() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Throwable err = new Exception("test");
Flux.just(1)
.doOnComplete(() -> {
throw Exceptions.propagate(err);
})
.subscribe(ts);
//nominal error path (DownstreamException)
ts.assertErrorMessage("test");
ts = AssertSubscriber.create();
try {
Flux.just(1)
.doOnComplete(() -> {
throw Exceptions.bubble(err);
})
.subscribe(ts);
Assert.fail();
}
catch (Exception e) {
Assert.assertTrue(Exceptions.unwrap(e) == err);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void completeCallbackError() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Throwable err = new Exception("test");
Flux.just(1)
.hide()
.doOnComplete(() -> {
throw Exceptions.propagate(err);
})
.subscribe(ts);
//nominal error path (DownstreamException)
ts.assertErrorMessage("test");
ts = AssertSubscriber.create();
try {
Flux.just(1)
.hide()
.doOnComplete(() -> {
throw Exceptions.bubble(err);
})
.subscribe(ts);
fail();
}
catch (Exception e) {
Assert.assertTrue(Exceptions.unwrap(e) == err);
}
}
代码示例来源:origin: reactor/reactor-core
})).producerNever(),
scenario(f -> f.doOnComplete(() -> {
throw exception();
}))
}, null, null)).producerEmpty(),
scenario(f -> f.doOnComplete(() -> {
throw exception();
})).producerEmpty(),
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(scenario(f -> f.doOnSubscribe(s -> {
})),
scenario(f -> f.doOnError(s -> {
})),
scenario(f -> f.doOnTerminate(() -> {
})),
scenario(f -> f.doAfterTerminate(() -> {
})),
scenario(f -> f.doOnCancel(() -> {
})),
scenario(f -> f.doOnComplete(() -> {
})),
scenario(f -> f.doOnRequest(d -> {
})),
scenario(f -> f.doOnRequest(s -> {
throw new RuntimeException(); //ignored
})),
scenario(f -> f.doOnNext(s -> {
})),
scenario(f -> f.doOnError(s -> {
})));
}
代码示例来源:origin: rsocket/rsocket-java
@Override
public Flux<Payload> requestStream(Payload payload) {
return source
.requestStream(payload)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}
内容来源于网络,如有侵权,请联系作者删除!