本文整理了Java中reactor.core.publisher.Flux.doOnSignal()
方法的一些代码示例,展示了Flux.doOnSignal()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnSignal()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnSignal
暂无
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} completes with an error.
* <p>
* <img class="marble" src="doc-files/marbles/doOnErrorForFlux.svg" alt="">
*
* @param onError the callback to call on {@link Subscriber#onError}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnError(Consumer<? super Throwable> onError) {
Objects.requireNonNull(onError, "onError");
return doOnSignal(this, null, null, onError, null, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} completes successfully.
* <p>
* <img class="marble" src="doc-files/marbles/doOnComplete.svg" alt="">
*
* @param onComplete the callback to call on {@link Subscriber#onComplete}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnComplete(Runnable onComplete) {
Objects.requireNonNull(onComplete, "onComplete");
return doOnSignal(this, null, null, null, onComplete, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered after the {@link Flux} terminates, either by completing downstream successfully or with an error.
* <p>
* <img class="marble" src="doc-files/marbles/doAfterTerminateForFlux.svg" alt="">
*
* @param afterTerminate the callback to call after {@link Subscriber#onComplete} or {@link Subscriber#onError}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doAfterTerminate(Runnable afterTerminate) {
Objects.requireNonNull(afterTerminate, "afterTerminate");
return doOnSignal(this, null, null, null, null, afterTerminate, null, null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} is cancelled.
* <p>
* <img class="marble" src="doc-files/marbles/doOnCancelForFlux.svg" alt="">
*
* @param onCancel the callback to call on {@link Subscription#cancel}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnCancel(Runnable onCancel) {
Objects.requireNonNull(onCancel, "onCancel");
return doOnSignal(this, null, null, null, null, null, null, onCancel);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} is subscribed.
* <p>
* This method is <strong>not</strong> intended for capturing the subscription and calling its methods,
* but for side effects like monitoring. For instance, the correct way to cancel a subscription is
* to call {@link Disposable#dispose()} on the Disposable returned by {@link Flux#subscribe()}.
* <p>
* <img class="marble" src="doc-files/marbles/doOnSubscribe.svg" alt="">
*
* @param onSubscribe the callback to call on {@link Subscriber#onSubscribe}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe");
return doOnSignal(this, onSubscribe, null, null, null, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggering a {@link LongConsumer} when this {@link Flux}
* receives any request.
* <p>
* Note that non fatal error raised in the callback will not be propagated and
* will simply trigger {@link Operators#onOperatorError(Throwable, Context)}.
*
* <p>
* <img class="marble" src="doc-files/marbles/doOnRequestForFlux.svg" alt="">
*
* @param consumer the consumer to invoke on each request
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "consumer");
return doOnSignal(this, null, null, null, null, null, consumer, null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} emits an item.
* <p>
* <img class="marble" src="doc-files/marbles/doOnNextForFlux.svg" alt="">
*
* @param onNext the callback to call on {@link Subscriber#onNext}
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnNext(Consumer<? super T> onNext) {
Objects.requireNonNull(onNext, "onNext");
return doOnSignal(this, null, onNext, null, null, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} terminates, either by
* completing successfully or with an error.
* <p>
* <img class="marble" src="doc-files/marbles/doOnTerminateForFlux.svg" alt="">
*
* @param onTerminate the callback to call on {@link Subscriber#onComplete} or {@link Subscriber#onError}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnTerminate(Runnable onTerminate) {
Objects.requireNonNull(onTerminate, "onTerminate");
return doOnSignal(this,
null,
null,
e -> onTerminate.run(),
onTerminate,
null,
null,
null);
}
代码示例来源:origin: reactor/reactor-core
scenario(f -> Flux.doOnSignal(f, null, null, s -> {
if (s.getMessage()
.equals(exception().getMessage())) {
scenario(f -> Flux.doOnSignal(f, null, null, s -> {
if (s.getMessage()
.equals(exception().getMessage())) {
scenario(f -> Flux.doOnSignal(f, null, null, s -> {
if (s.getMessage()
.equals(exception().getMessage())) {
代码示例来源:origin: reactor/reactor-core
})),
scenario(f -> Flux.doOnSignal(f, null, null, s -> {
if (s.getMessage()
.equals(exception().getMessage())) {
}, null, null)).producerEmpty(),
scenario(f -> Flux.doOnSignal(f, null, null, s -> {
if (s.getMessage()
.equals(exception().getMessage())) {
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} is cancelled.
* <p>
* <img class="marble" src="doc-files/marbles/doOnCancelForFlux.svg" alt="">
*
* @param onCancel the callback to call on {@link Subscription#cancel}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnCancel(Runnable onCancel) {
Objects.requireNonNull(onCancel, "onCancel");
return doOnSignal(this, null, null, null, null, null, null, onCancel);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered after the {@link Flux} terminates, either by completing downstream successfully or with an error.
* <p>
* <img class="marble" src="doc-files/marbles/doAfterTerminateForFlux.svg" alt="">
*
* @param afterTerminate the callback to call after {@link Subscriber#onComplete} or {@link Subscriber#onError}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doAfterTerminate(Runnable afterTerminate) {
Objects.requireNonNull(afterTerminate, "afterTerminate");
return doOnSignal(this, null, null, null, null, afterTerminate, null, null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} completes with an error.
* <p>
* <img class="marble" src="doc-files/marbles/doOnErrorForFlux.svg" alt="">
*
* @param onError the callback to call on {@link Subscriber#onError}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnError(Consumer<? super Throwable> onError) {
Objects.requireNonNull(onError, "onError");
return doOnSignal(this, null, null, onError, null, null, null, null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} completes successfully.
* <p>
* <img class="marble" src="doc-files/marbles/doOnComplete.svg" alt="">
*
* @param onComplete the callback to call on {@link Subscriber#onComplete}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnComplete(Runnable onComplete) {
Objects.requireNonNull(onComplete, "onComplete");
return doOnSignal(this, null, null, null, onComplete, null, null, null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} is subscribed.
* <p>
* This method is <strong>not</strong> intended for capturing the subscription and calling its methods,
* but for side effects like monitoring. For instance, the correct way to cancel a subscription is
* to call {@link Disposable#dispose()} on the Disposable returned by {@link Flux#subscribe()}.
* <p>
* <img class="marble" src="doc-files/marbles/doOnSubscribe.svg" alt="">
*
* @param onSubscribe the callback to call on {@link Subscriber#onSubscribe}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe");
return doOnSignal(this, onSubscribe, null, null, null, null, null, null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggering a {@link LongConsumer} when this {@link Flux}
* receives any request.
* <p>
* Note that non fatal error raised in the callback will not be propagated and
* will simply trigger {@link Operators#onOperatorError(Throwable, Context)}.
*
* <p>
* <img class="marble" src="doc-files/marbles/doOnRequestForFlux.svg" alt="">
*
* @param consumer the consumer to invoke on each request
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "consumer");
return doOnSignal(this, null, null, null, null, null, consumer, null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} emits an item.
* <p>
* <img class="marble" src="doc-files/marbles/doOnNextForFlux.svg" alt="">
*
* @param onNext the callback to call on {@link Subscriber#onNext}
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnNext(Consumer<? super T> onNext) {
Objects.requireNonNull(onNext, "onNext");
return doOnSignal(this, null, onNext, null, null, null, null, null);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Add behavior (side-effect) triggered when the {@link Flux} terminates, either by
* completing successfully or with an error.
* <p>
* <img class="marble" src="doc-files/marbles/doOnTerminateForFlux.svg" alt="">
*
* @param onTerminate the callback to call on {@link Subscriber#onComplete} or {@link Subscriber#onError}
*
* @return an observed {@link Flux}
*/
public final Flux<T> doOnTerminate(Runnable onTerminate) {
Objects.requireNonNull(onTerminate, "onTerminate");
return doOnSignal(this,
null,
null,
e -> onTerminate.run(),
onTerminate,
null,
null,
null);
}
内容来源于网络,如有侵权,请联系作者删除!