reactor.core.publisher.Flux.identityFunction()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(156)

本文整理了Java中reactor.core.publisher.Flux.identityFunction()方法的一些代码示例,展示了Flux.identityFunction()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.identityFunction()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:identityFunction

Flux.identityFunction介绍

暂无

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Collect all elements emitted by this {@link Flux} into a {@link Map multimap} that is
 * emitted by the resulting {@link Mono} when this sequence completes.
 * The key is extracted from each element by applying the {@code keyExtractor}
 * {@link Function}, and every element mapping to the same key is stored in the {@link List}
 * associated to said key.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/collectMultiMapWithKeyExtractor.svg" alt="">
 *
 * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
 *
 * @param <K> the type of the key extracted from each source element
 * @return a {@link Mono} of a {@link Map} of key-List(elements) pairs
 */
public final <K> Mono<Map<K, Collection<T>>> collectMultimap(Function<? super T, ? extends K> keyExtractor) {
  return collectMultimap(keyExtractor, identityFunction());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Collect all elements emitted by this {@link Flux} into a hashed {@link Map} that is
 * emitted by the resulting {@link Mono} when this sequence completes.
 * The key is extracted from each element by applying the {@code keyExtractor}
 * {@link Function}. In case several elements map to the same key, the associated value
 * will be the most recently emitted element.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/collectMapWithKeyExtractor.svg" alt="">
 *
 * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
 * @param <K> the type of the key extracted from each source element
 *
 * @return a {@link Mono} of a {@link Map} of key-element pairs (only including latest
 * element in case of key conflicts)
 */
public final <K> Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor) {
  return collectMap(keyExtractor, identityFunction());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Divide this sequence into dynamically created {@link Flux} (or groups) for each
 * unique key, as produced by the provided keyMapper {@link Function}. Note that
 * groupBy works best with a low cardinality of groups, so chose your keyMapper
 * function accordingly.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/groupByWithKeyMapper.svg" alt="">
 *
 * <p>
 * The groups need to be drained and consumed downstream for groupBy to work correctly.
 * Notably when the criteria produces a large amount of groups, it can lead to hanging
 * if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
 * with a {@code maxConcurrency} parameter that is set too low).
 *
 * @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key.
 * @param <K> the key type extracted from each value of this sequence
 *
 * @return a {@link Flux} of {@link GroupedFlux} grouped sequences
 */
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper) {
  return groupBy(keyMapper, identityFunction());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Sort elements from this {@link Flux} by collecting and sorting them in the background
 * then emitting the sorted sequence once this sequence completes.
 * Each item emitted by the {@link Flux} must implement {@link Comparable} with
 * respect to all other items in the sequence.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
 *
 * @throws ClassCastException if any item emitted by the {@link Flux} does not implement
 * {@link Comparable} with respect to all other items emitted by the {@link Flux}
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(){
  return collectSortedList().flatMapIterable(identityFunction());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Sort elements from this {@link Flux} using a {@link Comparator} function, by
 * collecting and sorting elements in the background then emitting the sorted sequence
 * once this sequence completes.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * @param sortFunction a function that compares two items emitted by this {@link Flux}
 * to indicate their sort order
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(Comparator<? super T> sortFunction) {
  return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}

代码示例来源:origin: reactor/reactor-core

static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
    boolean delayError, int maxConcurrency, int prefetch) {
  return onAssembly(new FluxMergeSequential<>(from(sources),
      identityFunction(),
      maxConcurrency, prefetch, delayError ? FluxConcatMap.ErrorMode.END :
      FluxConcatMap.ErrorMode.IMMEDIATE));
}

代码示例来源:origin: reactor/reactor-core

static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
    boolean delayError, int maxConcurrency, int prefetch) {
  return onAssembly(new FluxMergeSequential<>(new FluxIterable<>(sources),
      identityFunction(), maxConcurrency, prefetch,
      delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}

代码示例来源:origin: reactor/reactor-core

@SafeVarargs
static <I> Flux<I> mergeSequential(int prefetch, boolean delayError,
    Publisher<? extends I>... sources) {
  if (sources.length == 0) {
    return empty();
  }
  if (sources.length == 1) {
    return from(sources[0]);
  }
  return onAssembly(new FluxMergeSequential<>(new FluxArray<>(sources),
      identityFunction(), sources.length, prefetch,
      delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}

代码示例来源:origin: reactor/reactor-core

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void mappingBadPrefetch() throws Exception {
  Flux<Integer> source = Flux.just(1);
  try {
    Flux.just(source, source, source).flatMapSequential(Flux.identityFunction(), 10, -99);
  } catch (IllegalArgumentException ex) {
    assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
//see https://github.com/reactor/reactor-core/issues/577
public void collectionSupplierLimitedFifo() {
  Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 1, 3, 4, 1, 1, 1, 1, 2);
  StepVerifier.create(flux.distinct(Flux.identityFunction(), () -> new NaiveFifoQueue<>(5)))
        .expectNext(1, 2, 3, 4, 5, 6, 1, 2)
        .verifyComplete();
  StepVerifier.create(flux.distinct(Flux.identityFunction(),
          () -> new NaiveFifoQueue<>(3)))
        .expectNext(1, 2, 3, 4, 5, 6, 1, 3, 4, 2)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mainDoneThenError() {
  TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(source.flux()
              .windowWhen(Flux.never(), v -> Mono.just(1))
              .flatMap(Flux.identityFunction()))
        .then(() -> source.complete().error(new IllegalStateException("boom")))
        .expectComplete()
        .verifyThenAssertThat()
        .hasDroppedErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mainDoneThenNext() {
  TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(source.flux()
              .windowWhen(Flux.never(), v -> Mono.just(1))
              .flatMap(Flux.identityFunction()))
        .then(() -> source.complete().next(1))
        .expectComplete()
        .verifyThenAssertThat()
        .hasDropped(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mainDoneThenComplete() {
  TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(source.flux()
              .windowWhen(Flux.never(), v -> Mono.just(1))
              .flatMap(Flux.identityFunction()))
        .then(() -> source.complete().complete())
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedErrors()
        .hasNotDroppedElements();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void startDoneThenError() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  final TestPublisher<Integer> end = TestPublisher.create();
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.error(new IllegalStateException("boom"))
                 .error(new IllegalStateException("boom2")))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDroppedErrorWithMessage("boom2");
  source.assertNoSubscribers();
  //start doesn't cleanup and as such still has a subscriber
  end.assertNoSubscribers();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void endDoneThenError() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.create();
  TestPublisher<Integer> end = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.next(1))
        .then(() -> end.error(new IllegalStateException("boom"))
                .error(new IllegalStateException("boom2")))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDroppedErrorWithMessage("boom2");
  source.assertNoSubscribers();
  start.assertNoSubscribers();
  //end doesn't cleanup and as such still has a subscriber
}

代码示例来源:origin: reactor/reactor-core

@Test
public void startError() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.create();
  final TestPublisher<Integer> end = TestPublisher.create();
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.error(new IllegalStateException("boom")))
        .expectErrorMessage("boom")
        .verify();
  source.assertNoSubscribers();
  start.assertNoSubscribers();
  end.assertNoSubscribers();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void startDoneThenComplete() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  final TestPublisher<Integer> end = TestPublisher.create();
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.error(new IllegalStateException("boom"))
                 .complete())
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasNotDroppedErrors();
  source.assertNoSubscribers();
  //start doesn't cleanup and as such still has a subscriber
  end.assertNoSubscribers();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void endError() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.create();
  TestPublisher<Integer> end = TestPublisher.create();
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.next(1))
        .then(() -> end.error(new IllegalStateException("boom")))
        .expectErrorMessage("boom")
        .verify();
  source.assertNoSubscribers();
  start.assertNoSubscribers();
  end.assertNoSubscribers();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void endDoneThenComplete() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.create();
  TestPublisher<Integer> end = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.next(1))
        .then(() -> end.error(new IllegalStateException("boom"))
                .complete())
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasNotDroppedErrors();
  source.assertNoSubscribers();
  start.assertNoSubscribers();
  //end doesn't cleanup and as such still has a subscriber
}

代码示例来源:origin: reactor/reactor-core

@Test
public void startDoneThenNext() {
  TestPublisher<Integer> source = TestPublisher.create();
  TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
  final TestPublisher<Integer> end = TestPublisher.create();
  StepVerifier.create(source.flux()
               .windowWhen(start, v -> end)
               .flatMap(Flux.identityFunction())
  )
        .then(() -> start.error(new IllegalStateException("boom"))
                 .next(1))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasNotDroppedErrors()
        .hasNotDroppedElements();
  source.assertNoSubscribers();
  //start doesn't cleanup and as such still has a subscriber
  end.assertNoSubscribers();
}

相关文章

Flux类方法