reactor.core.publisher.Flux.onAssembly()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(352)

本文整理了Java中reactor.core.publisher.Flux.onAssembly()方法的一些代码示例,展示了Flux.onAssembly()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.onAssembly()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:onAssembly

Flux.onAssembly介绍

[英]To be used by custom operators: invokes assembly Hooks pointcut given a ConnectableFlux, potentially returning a new ConnectableFlux. This is for example useful to activate cross-cutting concerns at assembly time, eg. a generalized #checkpoint().
[中]由自定义运算符使用:调用给定ConnectableFlux的程序集钩子切入点,可能返回新的ConnectableFlux。例如,这对于在组装时激活横切关注点非常有用,例如,广义的#checkpoint()。

代码示例

代码示例来源:origin: resilience4j/resilience4j

public static <T> Flux<T> onAssembly(Flux<T> source) {
  return Flux.onAssembly(source);
}

代码示例来源:origin: reactor/reactor-core

final <R> Flux<R> flatMapSequential(Function<? super T, ? extends
    Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency,
    int prefetch) {
  return onAssembly(new FluxMergeSequential<>(this, mapper, maxConcurrency,
      prefetch, delayError ? FluxConcatMap.ErrorMode.END :
      FluxConcatMap.ErrorMode.IMMEDIATE));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Switch to an alternative {@link Publisher} if this sequence is completed without any data.
 * <p>
 * <img class="marble" src="doc-files/marbles/switchIfEmptyForFlux.svg" alt="">
 *
 * @param alternate the alternative {@link Publisher} if this sequence is empty
 *
 * @return a new {@link Flux} that falls back on a {@link Publisher} if source is empty
 */
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate) {
  return onAssembly(new FluxSwitchIfEmpty<>(this, alternate));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Provide a default unique value if this sequence is completed without any data
 * <p>
 * <img class="marble" src="doc-files/marbles/defaultIfEmpty.svg" alt="">
 *
 * @param defaultV the alternate value if this sequence is empty
 *
 * @return a new {@link Flux}
 */
public final Flux<T> defaultIfEmpty(T defaultV) {
  return onAssembly(new FluxDefaultIfEmpty<>(this, defaultV));
}

代码示例来源:origin: reactor/reactor-core

private final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
    Function<? super T, ? extends Publisher<V>> nextTimeoutFactory,
    String timeoutDescription) {
    return onAssembly(new FluxTimeout<>(this, firstTimeout, nextTimeoutFactory, timeoutDescription));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
 * index (zero based).
 * <p>
 * Each group can be consumed only once; requests and cancellation compose through.
 * Note that cancelling only one rail may result in undefined behavior.
 *
 * @return the new Flux instance
 */
public final Flux<GroupedFlux<Integer, T>> groups() {
  return Flux.onAssembly(new ParallelGroup<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a new {@link Flux} that will only emit a single element then onComplete.
 * <p>
 * <img class="marble" src="doc-files/marbles/just.svg" alt="">
 *
 * @param data the single element to emit
 * @param <T> the emitted data type
 *
 * @return a new {@link Flux}
 */
public static <T> Flux<T> just(T data) {
  return onAssembly(new FluxJust<>(data));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/repeatWithPredicateForFlux.svg" alt="">
 *
 * @param predicate the boolean to evaluate on onComplete.
 *
 * @return a {@link Flux} that repeats on onComplete while the predicate matches
 */
public final Flux<T> repeat(BooleanSupplier predicate) {
  return onAssembly(new FluxRepeatPredicate<>(this, predicate));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link Flux} that emits the items contained in the provided {@link Iterable}.
 * A new iterator will be created for each subscriber.
 * <p>
 * <img class="marble" src="doc-files/marbles/fromIterable.svg" alt="">
 *
 * @param it the {@link Iterable} to read data from
 * @param <T> The type of values in the source {@link Iterable} and resulting Flux
 *
 * @return a new {@link Flux}
 */
public static <T> Flux<T> fromIterable(Iterable<? extends T> it) {
  return onAssembly(new FluxIterable<>(it));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Relay values from this {@link Flux} until the given {@link Publisher} emits.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/takeUntilOtherForFlux.svg" alt="">
 *
 * @param other the companion {@link Publisher} that signals when to stop taking values from this {@link Flux}
 *
 * @return a new {@link Flux} limited by a companion {@link Publisher}
 *
 */
public final Flux<T> takeUntilOther(Publisher<?> other) {
  return onAssembly(new FluxTakeUntilOther<>(this, other));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Request an unbounded demand and push to the returned {@link Flux}, or drop
 * the observed elements if not enough demand is requested downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/onBackpressureDrop.svg" alt="">
 *
 * @reactor.discard This operator discards elements that it drops.
 *
 * @return a backpressured {@link Flux} that drops overflowing elements
 */
public final Flux<T> onBackpressureDrop() {
  return onAssembly(new FluxOnBackpressureDrop<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Subscribe to a returned fallback publisher when any error occurs, using a function to
 * choose the fallback depending on the error.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorResumeForFlux.svg" alt="">
 *
 * @param fallback the function to choose the fallback to an alternative {@link Publisher}
 *
 * @return a {@link Flux} falling back upon source onError
 */
public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {
  return onAssembly(new FluxOnErrorResume<>(this, fallback));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Re-subscribes to this {@link Flux} sequence if it signals any error
 * that matches the given {@link Predicate}, otherwise push the error downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/retryWithPredicateForFlux.svg" alt="">
 *
 * @param retryMatcher the predicate to evaluate if retry should occur based on a given error signal
 *
 * @return a {@link Flux} that retries on onError if the predicates matches.
 */
public final Flux<T> retry(Predicate<? super Throwable> retryMatcher) {
  return onAssembly(new FluxRetryPredicate<>(this, retryMatcher));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/repeatWithPredicateForMono.svg" alt="">
 *
 * @param predicate the boolean to evaluate on onComplete.
 *
 * @return a {@link Flux} that repeats on onComplete while the predicate matches
 *
 */
public final Flux<T> repeat(BooleanSupplier predicate) {
  return Flux.onAssembly(new MonoRepeatPredicate<>(this, predicate));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Request an unbounded demand and push to the returned {@link Flux}, or only keep
 * the most recent observed item if not enough demand is requested downstream.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/onBackpressureLatest.svg" alt="">
 * <p>
 * @reactor.discard Each time a new element comes in (the new "latest"), this operator
 * discards the previously retained element.
 *
 * @return a backpressured {@link Flux} that will only keep a reference to the last observed item
 */
public final Flux<T> onBackpressureLatest() {
  return onAssembly(new FluxOnBackpressureLatest<>(this));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Merges the values from each 'rail' in a round-robin or same-order fashion and
 * exposes it as a regular Publisher sequence, running with a give prefetch value for
 * the rails.
 *
 * @param prefetch the prefetch amount to use for each rail
 *
 * @return the new Flux instance
 */
public final Flux<T> sequential(int prefetch) {
  return Flux.onAssembly(new ParallelMergeSequential<>(this,
      prefetch,
      Queues.get(prefetch)));
}

代码示例来源:origin: reactor/reactor-core

final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends
    V>> mapper, boolean delayError, int concurrency, int prefetch) {
  return onAssembly(new FluxFlatMap<>(
      this,
      mapper,
      delayError,
      concurrency,
      Queues.get(concurrency),
      prefetch,
      Queues.get(prefetch)
  ));
}

代码示例来源:origin: reactor/reactor-core

static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
    boolean delayError, int maxConcurrency, int prefetch) {
  return onAssembly(new FluxMergeSequential<>(from(sources),
      identityFunction(),
      maxConcurrency, prefetch, delayError ? FluxConcatMap.ErrorMode.END :
      FluxConcatMap.ErrorMode.IMMEDIATE));
}

代码示例来源:origin: reactor/reactor-core

static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
    boolean delayError, int maxConcurrency, int prefetch) {
  return onAssembly(new FluxMergeSequential<>(new FluxIterable<>(sources),
      identityFunction(), maxConcurrency, prefetch,
      delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}

代码示例来源:origin: reactor/reactor-core

@SafeVarargs
static <I> Flux<I> mergeSequential(int prefetch, boolean delayError,
    Publisher<? extends I>... sources) {
  if (sources.length == 0) {
    return empty();
  }
  if (sources.length == 1) {
    return from(sources[0]);
  }
  return onAssembly(new FluxMergeSequential<>(new FluxArray<>(sources),
      identityFunction(), sources.length, prefetch,
      delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}

相关文章

Flux类方法