reactor.core.publisher.Flux.concatMapIterable()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(197)

本文整理了Java中reactor.core.publisher.Flux.concatMapIterable()方法的一些代码示例,展示了Flux.concatMapIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatMapIterable()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatMapIterable

Flux.concatMapIterable介绍

[英]Transform the items emitted by this Flux into Iterable, then flatten the elements from those by concatenating them into a single Flux.

Note that unlike #flatMap(Function) and #concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.
[中]将此通量发出的项目转换为Iterable,然后通过将这些元素连接为单个通量来将其展平。
请注意,与#flatMap(函数)和#concatMap(函数)不同,对于Iterable,不存在急切与懒惰内部订阅的概念。Iterables的内容都是按顺序播放的。因此,FlatMapiteTable和ConcatMapiteTable相当于为使用concat vs flatMap期望探索API的用户提供了一种可发现性改进。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform the items emitted by this {@link Flux} into {@link Iterable}, then flatten the elements from those by
 * concatenating them into a single {@link Flux}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/concatMapIterable.svg" alt="">
 * <p>
 * Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is
 * no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
 * Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
 * improvement for users that explore the API with the concat vs flatMap expectation.
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
 *
 * @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
 * @param <R> the merged output sequence type
 *
 * @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
 */
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
  return concatMapIterable(mapper, Queues.XS_BUFFER_SIZE);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
  return connection.execute(node, RedisServerReactiveCommands::clientList)
      .concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
  public Flux<RedisClientInfo> getClientList() {

    return connection.execute(RedisServerReactiveCommands::clientList)
        .concatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void issue422(){
  Flux<Integer> source = Flux.create((sink) -> {
    for (int i = 0; i < 300; i++) {
      sink.next(i);
    }
    sink.complete();
  });
  Flux<Integer> cached = source.cache();
  long cachedCount = cached.concatMapIterable(Collections::singleton)
               .distinct().count().block();
  //System.out.println("source: " + sourceCount);
  System.out.println("cached: " + cachedCount);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .concatMapIterable(v -> Arrays.asList(v, v + 1))
    .subscribe(ts);
  ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void just() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.just(1)
    .concatMapIterable(v -> Arrays.asList(v, v + 1))
    .subscribe(ts);
  ts.assertValues(1, 2)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void empty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>empty().concatMapIterable(v -> Arrays.asList(v, v + 1))
             .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalNoFusion() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .hide()
    .concatMapIterable(v -> Arrays.asList(v, v + 1))
    .subscribe(ts);
  ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .concatMapIterable(v -> Arrays.asList(v, v + 1))
    .subscribe(ts);
  ts.assertNoEvents();
  ts.request(1);
  ts.assertIncomplete(1);
  ts.request(2);
  ts.assertIncomplete(1, 2, 2);
  ts.request(7);
  ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBackpressuredNoFusion() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .hide()
    .concatMapIterable(v -> Arrays.asList(v, v + 1))
    .subscribe(ts);
  ts.assertNoEvents();
  ts.request(1);
  ts.assertIncomplete(1);
  ts.request(2);
  ts.assertIncomplete(1, 2, 2);
  ts.request(7);
  ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param mapper
 * @return
 * @see reactor.core.publisher.Flux#concatMapIterable(java.util.function.Function)
 */
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
  return boxed.concatMapIterable(mapper);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param mapper
 * @param prefetch
 * @return
 * @see reactor.core.publisher.Flux#concatMapIterable(java.util.function.Function, int)
 */
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper,
    int prefetch) {
  return boxed.concatMapIterable(mapper, prefetch);
}
/**

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Transform the items emitted by this {@link Flux} into {@link Iterable}, then flatten the elements from those by
 * concatenating them into a single {@link Flux}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/concatMapIterable.svg" alt="">
 * <p>
 * Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is
 * no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
 * Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
 * improvement for users that explore the API with the concat vs flatMap expectation.
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
 *
 * @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
 * @param <R> the merged output sequence type
 *
 * @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
 */
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
  return concatMapIterable(mapper, Queues.XS_BUFFER_SIZE);
}

代码示例来源:origin: org.springframework.data/spring-data-redis

@Override
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
  return connection.execute(node, RedisServerReactiveCommands::clientList)
      .concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
  return connection.execute(node, RedisServerReactiveCommands::clientList)
      .concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}

代码示例来源:origin: org.springframework.data/spring-data-redis

@Override
  public Flux<RedisClientInfo> getClientList() {

    return connection.execute(RedisServerReactiveCommands::clientList)
        .concatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s));
  }
}

代码示例来源:origin: apache/servicemix-bundles

@Override
  public Flux<RedisClientInfo> getClientList() {

    return connection.execute(RedisServerReactiveCommands::clientList)
        .concatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s));
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String... args) {
    Flux<Integer> source = Flux.create((e) -> {
      for (int i = 0; i < 300; i++) {
        e.next(i);
      }
      e.complete();
    });
    Flux<Integer> cached = source.cache();

    long sourceCount = source.concatMapIterable(Collections::singleton)
        .distinct().count().block();
    long cachedCount = cached.concatMapIterable(Collections::singleton)
        .distinct().count().block();

    System.out.println("source: " + sourceCount);
    System.out.println("cached: " + cachedCount);
  }
}

相关文章

Flux类方法