本文整理了Java中reactor.core.publisher.Flux.onAssembly()
方法的一些代码示例,展示了Flux.onAssembly()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onAssembly()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onAssembly
[英]To be used by custom operators: invokes assembly Hooks pointcut given a ConnectableFlux, potentially returning a new ConnectableFlux. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized #checkpoint().
[中]由自定义运算符使用:调用给定ConnectableFlux的程序集钩子切入点,可能返回新的ConnectableFlux。例如,这对于在组装时激活横切关注点非常有用,例如,广义的#checkpoint()。
代码示例来源:origin: resilience4j/resilience4j
public static <T> Flux<T> onAssembly(Flux<T> source) {
return Flux.onAssembly(source);
}
代码示例来源:origin: reactor/reactor-core
final <R> Flux<R> flatMapSequential(Function<? super T, ? extends
Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency,
int prefetch) {
return onAssembly(new FluxMergeSequential<>(this, mapper, maxConcurrency,
prefetch, delayError ? FluxConcatMap.ErrorMode.END :
FluxConcatMap.ErrorMode.IMMEDIATE));
}
代码示例来源:origin: reactor/reactor-core
/**
* Switch to an alternative {@link Publisher} if this sequence is completed without any data.
* <p>
* <img class="marble" src="doc-files/marbles/switchIfEmptyForFlux.svg" alt="">
*
* @param alternate the alternative {@link Publisher} if this sequence is empty
*
* @return a new {@link Flux} that falls back on a {@link Publisher} if source is empty
*/
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate) {
return onAssembly(new FluxSwitchIfEmpty<>(this, alternate));
}
代码示例来源:origin: reactor/reactor-core
/**
* Provide a default unique value if this sequence is completed without any data
* <p>
* <img class="marble" src="doc-files/marbles/defaultIfEmpty.svg" alt="">
*
* @param defaultV the alternate value if this sequence is empty
*
* @return a new {@link Flux}
*/
public final Flux<T> defaultIfEmpty(T defaultV) {
return onAssembly(new FluxDefaultIfEmpty<>(this, defaultV));
}
代码示例来源:origin: reactor/reactor-core
private final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
Function<? super T, ? extends Publisher<V>> nextTimeoutFactory,
String timeoutDescription) {
return onAssembly(new FluxTimeout<>(this, firstTimeout, nextTimeoutFactory, timeoutDescription));
}
代码示例来源:origin: reactor/reactor-core
/**
* Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
* index (zero based).
* <p>
* Each group can be consumed only once; requests and cancellation compose through.
* Note that cancelling only one rail may result in undefined behavior.
*
* @return the new Flux instance
*/
public final Flux<GroupedFlux<Integer, T>> groups() {
return Flux.onAssembly(new ParallelGroup<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a new {@link Flux} that will only emit a single element then onComplete.
* <p>
* <img class="marble" src="doc-files/marbles/just.svg" alt="">
*
* @param data the single element to emit
* @param <T> the emitted data type
*
* @return a new {@link Flux}
*/
public static <T> Flux<T> just(T data) {
return onAssembly(new FluxJust<>(data));
}
代码示例来源:origin: reactor/reactor-core
/**
* Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
*
* <p>
* <img class="marble" src="doc-files/marbles/repeatWithPredicateForFlux.svg" alt="">
*
* @param predicate the boolean to evaluate on onComplete.
*
* @return a {@link Flux} that repeats on onComplete while the predicate matches
*/
public final Flux<T> repeat(BooleanSupplier predicate) {
return onAssembly(new FluxRepeatPredicate<>(this, predicate));
}
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Flux} that emits the items contained in the provided {@link Iterable}.
* A new iterator will be created for each subscriber.
* <p>
* <img class="marble" src="doc-files/marbles/fromIterable.svg" alt="">
*
* @param it the {@link Iterable} to read data from
* @param <T> The type of values in the source {@link Iterable} and resulting Flux
*
* @return a new {@link Flux}
*/
public static <T> Flux<T> fromIterable(Iterable<? extends T> it) {
return onAssembly(new FluxIterable<>(it));
}
代码示例来源:origin: reactor/reactor-core
/**
* Relay values from this {@link Flux} until the given {@link Publisher} emits.
*
* <p>
* <img class="marble" src="doc-files/marbles/takeUntilOtherForFlux.svg" alt="">
*
* @param other the companion {@link Publisher} that signals when to stop taking values from this {@link Flux}
*
* @return a new {@link Flux} limited by a companion {@link Publisher}
*
*/
public final Flux<T> takeUntilOther(Publisher<?> other) {
return onAssembly(new FluxTakeUntilOther<>(this, other));
}
代码示例来源:origin: reactor/reactor-core
/**
* Request an unbounded demand and push to the returned {@link Flux}, or drop
* the observed elements if not enough demand is requested downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/onBackpressureDrop.svg" alt="">
*
* @reactor.discard This operator discards elements that it drops.
*
* @return a backpressured {@link Flux} that drops overflowing elements
*/
public final Flux<T> onBackpressureDrop() {
return onAssembly(new FluxOnBackpressureDrop<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Subscribe to a returned fallback publisher when any error occurs, using a function to
* choose the fallback depending on the error.
*
* <p>
* <img class="marble" src="doc-files/marbles/onErrorResumeForFlux.svg" alt="">
*
* @param fallback the function to choose the fallback to an alternative {@link Publisher}
*
* @return a {@link Flux} falling back upon source onError
*/
public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {
return onAssembly(new FluxOnErrorResume<>(this, fallback));
}
代码示例来源:origin: reactor/reactor-core
/**
* Re-subscribes to this {@link Flux} sequence if it signals any error
* that matches the given {@link Predicate}, otherwise push the error downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/retryWithPredicateForFlux.svg" alt="">
*
* @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
*
* @return a {@link Flux} that retries on onError if the predicates matches.
*/
public final Flux<T> retry(Predicate<? super Throwable> retryMatcher) {
return onAssembly(new FluxRetryPredicate<>(this, retryMatcher));
}
代码示例来源:origin: reactor/reactor-core
/**
* Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
*
* <p>
* <img class="marble" src="doc-files/marbles/repeatWithPredicateForMono.svg" alt="">
*
* @param predicate the boolean to evaluate on onComplete.
*
* @return a {@link Flux} that repeats on onComplete while the predicate matches
*
*/
public final Flux<T> repeat(BooleanSupplier predicate) {
return Flux.onAssembly(new MonoRepeatPredicate<>(this, predicate));
}
代码示例来源:origin: reactor/reactor-core
/**
* Request an unbounded demand and push to the returned {@link Flux}, or only keep
* the most recent observed item if not enough demand is requested downstream.
*
* <p>
* <img class="marble" src="doc-files/marbles/onBackpressureLatest.svg" alt="">
* <p>
* @reactor.discard Each time a new element comes in (the new "latest"), this operator
* discards the previously retained element.
*
* @return a backpressured {@link Flux} that will only keep a reference to the last observed item
*/
public final Flux<T> onBackpressureLatest() {
return onAssembly(new FluxOnBackpressureLatest<>(this));
}
代码示例来源:origin: reactor/reactor-core
/**
* Merges the values from each 'rail' in a round-robin or same-order fashion and
* exposes it as a regular Publisher sequence, running with a give prefetch value for
* the rails.
*
* @param prefetch the prefetch amount to use for each rail
*
* @return the new Flux instance
*/
public final Flux<T> sequential(int prefetch) {
return Flux.onAssembly(new ParallelMergeSequential<>(this,
prefetch,
Queues.get(prefetch)));
}
代码示例来源:origin: reactor/reactor-core
final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends
V>> mapper, boolean delayError, int concurrency, int prefetch) {
return onAssembly(new FluxFlatMap<>(
this,
mapper,
delayError,
concurrency,
Queues.get(concurrency),
prefetch,
Queues.get(prefetch)
));
}
代码示例来源:origin: reactor/reactor-core
static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
boolean delayError, int maxConcurrency, int prefetch) {
return onAssembly(new FluxMergeSequential<>(from(sources),
identityFunction(),
maxConcurrency, prefetch, delayError ? FluxConcatMap.ErrorMode.END :
FluxConcatMap.ErrorMode.IMMEDIATE));
}
代码示例来源:origin: reactor/reactor-core
static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
boolean delayError, int maxConcurrency, int prefetch) {
return onAssembly(new FluxMergeSequential<>(new FluxIterable<>(sources),
identityFunction(), maxConcurrency, prefetch,
delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}
代码示例来源:origin: reactor/reactor-core
@SafeVarargs
static <I> Flux<I> mergeSequential(int prefetch, boolean delayError,
Publisher<? extends I>... sources) {
if (sources.length == 0) {
return empty();
}
if (sources.length == 1) {
return from(sources[0]);
}
return onAssembly(new FluxMergeSequential<>(new FluxArray<>(sources),
identityFunction(), sources.length, prefetch,
delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}
内容来源于网络,如有侵权,请联系作者删除!