reactor.core.publisher.Flux.collect()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(239)

本文整理了Java中reactor.core.publisher.Flux.collect()方法的一些代码示例,展示了Flux.collect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.collect()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:collect

Flux.collect介绍

[英]Collect all elements emitted by this Flux into a user-defined container, by applying a collector BiConsumer taking the container and each element. The collected result will be emitted when this sequence completes.
[中]通过应用收集器BiConsumer(获取容器和每个元素),将此通量发出的所有元素收集到用户定义的容器中。收集的结果将在该序列完成时发出。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Instance> find(InstanceId id) {
  //hmm a simple reduce doesn't return empty when not found...
  return eventStore.find(id).collect((Supplier<AtomicReference<Instance>>) AtomicReference::new, (ref, event) -> {
    Instance instance = ref.get() != null ? ref.get() : Instance.create(id);
    ref.set(instance.apply(event));
  }).flatMap(ref -> Mono.justOrEmpty(ref.get()));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Collect all elements emitted by this {@link Flux} into a user-defined {@link Map} that is
 * emitted by the resulting {@link Mono} when this sequence completes.
 * The key is extracted from each element by applying the {@code keyExtractor}
 * {@link Function}, and the value is extracted by the {@code valueExtractor} Function.
 * In case several elements map to the same key, the associated value will be derived
 * from the most recently emitted element.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/collectMapWithKeyAndValueExtractors.svg" alt="">
 *
 * @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
 * @param valueExtractor a {@link Function} to map elements to a value for the {@link Map}
 * @param mapSupplier a {@link Map} factory called for each {@link Subscriber}
 *
 * @param <K> the type of the key extracted from each source element
 * @param <V> the type of the value extracted from each source element
 *
 * @return a {@link Mono} of a {@link Map} of key-value pairs (only including latest
 * element's value in case of key conflicts)
 */
public final <K, V> Mono<Map<K, V>> collectMap(
    final Function<? super T, ? extends K> keyExtractor,
    final Function<? super T, ? extends V> valueExtractor,
    Supplier<Map<K, V>> mapSupplier) {
  Objects.requireNonNull(keyExtractor, "Key extractor is null");
  Objects.requireNonNull(valueExtractor, "Value extractor is null");
  Objects.requireNonNull(mapSupplier, "Map supplier is null");
  return collect(mapSupplier, (m, d) -> m.put(keyExtractor.apply(d), valueExtractor.apply(d)));
}

代码示例来源:origin: reactor/reactor-core

Objects.requireNonNull(valueExtractor, "Value extractor is null");
Objects.requireNonNull(mapSupplier, "Map supplier is null");
return collect(mapSupplier, (m, d) -> {
  K key = keyExtractor.apply(d);
  Collection<V> values = m.computeIfAbsent(key, k -> new ArrayList<>());

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Properties> info() {
  return Flux.merge(executeOnAllNodes(this::info)).collect(PropertiesCollector.INSTANCE);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Properties> getConfig(String pattern) {
  Assert.hasText(pattern, "Pattern must not be null or empty!");
  return Flux.merge(executeOnAllNodes(node -> getConfig(node, pattern))) //
      .collect(PropertiesCollector.INSTANCE);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Properties> info(String section) {
  Assert.hasText(section, "Section must not be null or empty!");
  return Flux.merge(executeOnAllNodes(redisClusterNode -> info(redisClusterNode, section)))
      .collect(PropertiesCollector.INSTANCE);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Long> delete(Publisher<K> keys) {
  Assert.notNull(keys, "Keys must not be null!");
  return createFlux(connection -> connection.keyCommands() //
      .mDel(Flux.from(keys).map(this::rawKey).buffer(128)) //
      .map(CommandResponse::getOutput)) //
      .collect(Collectors.summingLong(value -> value));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Mono<Long> unlink(Publisher<K> keys) {
  Assert.notNull(keys, "Keys must not be null!");
  return createFlux(connection -> connection.keyCommands() //
      .mUnlink(Flux.from(keys).map(this::rawKey).buffer(128)) //
      .map(CommandResponse::getOutput)) //
      .collect(Collectors.summingLong(value -> value));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
    return response.writeWith(Flux.just("h", "e", "l", "l", "o")
                  .delayElements(Duration.ofMillis(100))
                  .publishOn(asyncGroup)
        .collect(dataBufferFactory::allocateBuffer, (buffer, str) -> buffer.write(str.getBytes())));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void nullSupplier() {
  Flux.never().collect(null, (a, b) -> {});
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void nullAction() {
  Flux.never().collect(() -> 1, null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create();
  Flux.range(1, 10).collect(ArrayList<Integer>::new, (a, b) -> a.add(b)).subscribe(ts);
  ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void collectToList() {
  Mono<List<Integer>> source = Flux.range(1, 5).collect(Collectors.toList());
  for (int i = 0; i < 5; i++) {
    AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
    source.subscribe(ts);
    ts.assertValues(Arrays.asList(1, 2, 3, 4, 5))
    .assertNoError()
    .assertComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void supplierThrows() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 10).collect(() -> {
    throw new RuntimeException("forced failure");
  }, (a, b) -> {
  }).subscribe(ts);
  ts.assertNoValues()
   .assertError(RuntimeException.class)
   .assertErrorWith( e -> Assert.assertTrue(e.getMessage().contains("forced failure")))
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void actionThrows() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 10).collect(() -> 1, (a, b) -> {
    throw new RuntimeException("forced failure");
  }).subscribe(ts);
  ts.assertNoValues()
   .assertError(RuntimeException.class)
   .assertErrorWith( e -> Assert.assertTrue(e.getMessage().contains("forced failure")))
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void collectToSet() {
  Mono<Set<Integer>> source = Flux.just(1).repeat(5).collect(Collectors.toSet());
  for (int i = 0; i < 5; i++) {
    AssertSubscriber<Set<Integer>> ts = AssertSubscriber.create();
    source.subscribe(ts);
    ts.assertValues(Collections.singleton(1))
    .assertNoError()
    .assertComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressured() {
  AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create(0);
  Flux.range(1, 10).collect(ArrayList<Integer>::new, ArrayList::add).subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void test() throws Exception {
  Flux.just("red", "white", "blue")
    .log("source")
    .flatMap(value -> Mono.fromCallable(() -> {
                Thread.sleep(1000);
                return value;
              }).subscribeOn(Schedulers.elastic()))
    .log("merged")
    .collect(Result::new, Result::add)
    .doOnNext(Result::stop)
    .log("accumulated")
    .toFuture()
    .get();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void supplierReturnsNull() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 10).collect(() -> null, (a, b) -> {
  }).subscribe(ts);
  ts.assertNoValues()
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: aol/cyclops

@Test
public void fluxConcatMap(){
  System.out.println(Flux.just(1,2,3).concatMap(i->Flux.just(i+100,200))
    .collect(Collectors.toList()).block());
}
@Test

相关文章

Flux类方法