本文整理了Java中reactor.core.publisher.Flux.concatMapIterable()
方法的一些代码示例,展示了Flux.concatMapIterable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatMapIterable()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatMapIterable
[英]Transform the items emitted by this Flux into Iterable, then flatten the elements from those by concatenating them into a single Flux.
Note that unlike #flatMap(Function) and #concatMap(Function), with Iterable there is no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially. Thus flatMapIterable and concatMapIterable are equivalent offered as a discoverability improvement for users that explore the API with the concat vs flatMap expectation.
[中]将此通量发出的项目转换为Iterable,然后通过将这些元素连接为单个通量来将其展平。
请注意,与#flatMap(函数)和#concatMap(函数)不同,对于Iterable,不存在急切与懒惰内部订阅的概念。Iterables的内容都是按顺序播放的。因此,FlatMapiteTable和ConcatMapiteTable相当于为使用concat vs flatMap期望探索API的用户提供了一种可发现性改进。
代码示例来源:origin: reactor/reactor-core
/**
* Transform the items emitted by this {@link Flux} into {@link Iterable}, then flatten the elements from those by
* concatenating them into a single {@link Flux}.
*
* <p>
* <img class="marble" src="doc-files/marbles/concatMapIterable.svg" alt="">
* <p>
* Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is
* no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
* Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
* improvement for users that explore the API with the concat vs flatMap expectation.
*
* @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param <R> the merged output sequence type
*
* @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
*/
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
return concatMapIterable(mapper, Queues.XS_BUFFER_SIZE);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
return connection.execute(node, RedisServerReactiveCommands::clientList)
.concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<RedisClientInfo> getClientList() {
return connection.execute(RedisServerReactiveCommands::clientList)
.concatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s));
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void issue422(){
Flux<Integer> source = Flux.create((sink) -> {
for (int i = 0; i < 300; i++) {
sink.next(i);
}
sink.complete();
});
Flux<Integer> cached = source.cache();
long cachedCount = cached.concatMapIterable(Collections::singleton)
.distinct().count().block();
//System.out.println("source: " + sourceCount);
System.out.println("cached: " + cachedCount);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.concatMapIterable(v -> Arrays.asList(v, v + 1))
.subscribe(ts);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void just() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1)
.concatMapIterable(v -> Arrays.asList(v, v + 1))
.subscribe(ts);
ts.assertValues(1, 2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void empty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.<Integer>empty().concatMapIterable(v -> Arrays.asList(v, v + 1))
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalNoFusion() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.hide()
.concatMapIterable(v -> Arrays.asList(v, v + 1))
.subscribe(ts);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.concatMapIterable(v -> Arrays.asList(v, v + 1))
.subscribe(ts);
ts.assertNoEvents();
ts.request(1);
ts.assertIncomplete(1);
ts.request(2);
ts.assertIncomplete(1, 2, 2);
ts.request(7);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressuredNoFusion() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.hide()
.concatMapIterable(v -> Arrays.asList(v, v + 1))
.subscribe(ts);
ts.assertNoEvents();
ts.request(1);
ts.assertIncomplete(1);
ts.request(2);
ts.assertIncomplete(1, 2, 2);
ts.request(7);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param mapper
* @return
* @see reactor.core.publisher.Flux#concatMapIterable(java.util.function.Function)
*/
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
return boxed.concatMapIterable(mapper);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param mapper
* @param prefetch
* @return
* @see reactor.core.publisher.Flux#concatMapIterable(java.util.function.Function, int)
*/
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper,
int prefetch) {
return boxed.concatMapIterable(mapper, prefetch);
}
/**
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Transform the items emitted by this {@link Flux} into {@link Iterable}, then flatten the elements from those by
* concatenating them into a single {@link Flux}.
*
* <p>
* <img class="marble" src="doc-files/marbles/concatMapIterable.svg" alt="">
* <p>
* Note that unlike {@link #flatMap(Function)} and {@link #concatMap(Function)}, with Iterable there is
* no notion of eager vs lazy inner subscription. The content of the Iterables are all played sequentially.
* Thus {@code flatMapIterable} and {@code concatMapIterable} are equivalent offered as a discoverability
* improvement for users that explore the API with the concat vs flatMap expectation.
*
* @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
*
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param <R> the merged output sequence type
*
* @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
*/
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
return concatMapIterable(mapper, Queues.XS_BUFFER_SIZE);
}
代码示例来源:origin: org.springframework.data/spring-data-redis
@Override
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
return connection.execute(node, RedisServerReactiveCommands::clientList)
.concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}
代码示例来源:origin: apache/servicemix-bundles
@Override
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
return connection.execute(node, RedisServerReactiveCommands::clientList)
.concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}
代码示例来源:origin: org.springframework.data/spring-data-redis
@Override
public Flux<RedisClientInfo> getClientList() {
return connection.execute(RedisServerReactiveCommands::clientList)
.concatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s));
}
}
代码示例来源:origin: apache/servicemix-bundles
@Override
public Flux<RedisClientInfo> getClientList() {
return connection.execute(RedisServerReactiveCommands::clientList)
.concatMapIterable(s -> LettuceConverters.stringToRedisClientListConverter().convert(s));
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String... args) {
Flux<Integer> source = Flux.create((e) -> {
for (int i = 0; i < 300; i++) {
e.next(i);
}
e.complete();
});
Flux<Integer> cached = source.cache();
long sourceCount = source.concatMapIterable(Collections::singleton)
.distinct().count().block();
long cachedCount = cached.concatMapIterable(Collections::singleton)
.distinct().count().block();
System.out.println("source: " + sourceCount);
System.out.println("cached: " + cachedCount);
}
}
内容来源于网络,如有侵权,请联系作者删除!