本文整理了Java中reactor.core.publisher.Flux.collect()
方法的一些代码示例,展示了Flux.collect()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.collect()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:collect
[英]Collect all elements emitted by this Flux into a user-defined container, by applying a collector BiConsumer taking the container and each element. The collected result will be emitted when this sequence completes.
[中]通过应用收集器BiConsumer(获取容器和每个元素),将此通量发出的所有元素收集到用户定义的容器中。收集的结果将在该序列完成时发出。
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Mono<Instance> find(InstanceId id) {
//hmm a simple reduce doesn't return empty when not found...
return eventStore.find(id).collect((Supplier<AtomicReference<Instance>>) AtomicReference::new, (ref, event) -> {
Instance instance = ref.get() != null ? ref.get() : Instance.create(id);
ref.set(instance.apply(event));
}).flatMap(ref -> Mono.justOrEmpty(ref.get()));
}
代码示例来源:origin: reactor/reactor-core
/**
* Collect all elements emitted by this {@link Flux} into a user-defined {@link Map} that is
* emitted by the resulting {@link Mono} when this sequence completes.
* The key is extracted from each element by applying the {@code keyExtractor}
* {@link Function}, and the value is extracted by the {@code valueExtractor} Function.
* In case several elements map to the same key, the associated value will be derived
* from the most recently emitted element.
*
* <p>
* <img class="marble" src="doc-files/marbles/collectMapWithKeyAndValueExtractors.svg" alt="">
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param valueExtractor a {@link Function} to map elements to a value for the {@link Map}
* @param mapSupplier a {@link Map} factory called for each {@link Subscriber}
*
* @param <K> the type of the key extracted from each source element
* @param <V> the type of the value extracted from each source element
*
* @return a {@link Mono} of a {@link Map} of key-value pairs (only including latest
* element's value in case of key conflicts)
*/
public final <K, V> Mono<Map<K, V>> collectMap(
final Function<? super T, ? extends K> keyExtractor,
final Function<? super T, ? extends V> valueExtractor,
Supplier<Map<K, V>> mapSupplier) {
Objects.requireNonNull(keyExtractor, "Key extractor is null");
Objects.requireNonNull(valueExtractor, "Value extractor is null");
Objects.requireNonNull(mapSupplier, "Map supplier is null");
return collect(mapSupplier, (m, d) -> m.put(keyExtractor.apply(d), valueExtractor.apply(d)));
}
代码示例来源:origin: reactor/reactor-core
Objects.requireNonNull(valueExtractor, "Value extractor is null");
Objects.requireNonNull(mapSupplier, "Map supplier is null");
return collect(mapSupplier, (m, d) -> {
K key = keyExtractor.apply(d);
Collection<V> values = m.computeIfAbsent(key, k -> new ArrayList<>());
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Mono<Properties> info() {
return Flux.merge(executeOnAllNodes(this::info)).collect(PropertiesCollector.INSTANCE);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Mono<Properties> getConfig(String pattern) {
Assert.hasText(pattern, "Pattern must not be null or empty!");
return Flux.merge(executeOnAllNodes(node -> getConfig(node, pattern))) //
.collect(PropertiesCollector.INSTANCE);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Mono<Properties> info(String section) {
Assert.hasText(section, "Section must not be null or empty!");
return Flux.merge(executeOnAllNodes(redisClusterNode -> info(redisClusterNode, section)))
.collect(PropertiesCollector.INSTANCE);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Mono<Long> delete(Publisher<K> keys) {
Assert.notNull(keys, "Keys must not be null!");
return createFlux(connection -> connection.keyCommands() //
.mDel(Flux.from(keys).map(this::rawKey).buffer(128)) //
.map(CommandResponse::getOutput)) //
.collect(Collectors.summingLong(value -> value));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Mono<Long> unlink(Publisher<K> keys) {
Assert.notNull(keys, "Keys must not be null!");
return createFlux(connection -> connection.keyCommands() //
.mUnlink(Flux.from(keys).map(this::rawKey).buffer(128)) //
.map(CommandResponse::getOutput)) //
.collect(Collectors.summingLong(value -> value));
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(Flux.just("h", "e", "l", "l", "o")
.delayElements(Duration.ofMillis(100))
.publishOn(asyncGroup)
.collect(dataBufferFactory::allocateBuffer, (buffer, str) -> buffer.write(str.getBytes())));
}
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void nullSupplier() {
Flux.never().collect(null, (a, b) -> {});
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void nullAction() {
Flux.never().collect(() -> 1, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create();
Flux.range(1, 10).collect(ArrayList<Integer>::new, (a, b) -> a.add(b)).subscribe(ts);
ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void collectToList() {
Mono<List<Integer>> source = Flux.range(1, 5).collect(Collectors.toList());
for (int i = 0; i < 5; i++) {
AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertValues(Arrays.asList(1, 2, 3, 4, 5))
.assertNoError()
.assertComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void supplierThrows() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 10).collect(() -> {
throw new RuntimeException("forced failure");
}, (a, b) -> {
}).subscribe(ts);
ts.assertNoValues()
.assertError(RuntimeException.class)
.assertErrorWith( e -> Assert.assertTrue(e.getMessage().contains("forced failure")))
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void actionThrows() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 10).collect(() -> 1, (a, b) -> {
throw new RuntimeException("forced failure");
}).subscribe(ts);
ts.assertNoValues()
.assertError(RuntimeException.class)
.assertErrorWith( e -> Assert.assertTrue(e.getMessage().contains("forced failure")))
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void collectToSet() {
Mono<Set<Integer>> source = Flux.just(1).repeat(5).collect(Collectors.toSet());
for (int i = 0; i < 5; i++) {
AssertSubscriber<Set<Integer>> ts = AssertSubscriber.create();
source.subscribe(ts);
ts.assertValues(Collections.singleton(1))
.assertNoError()
.assertComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create(0);
Flux.range(1, 10).collect(ArrayList<Integer>::new, ArrayList::add).subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void test() throws Exception {
Flux.just("red", "white", "blue")
.log("source")
.flatMap(value -> Mono.fromCallable(() -> {
Thread.sleep(1000);
return value;
}).subscribeOn(Schedulers.elastic()))
.log("merged")
.collect(Result::new, Result::add)
.doOnNext(Result::stop)
.log("accumulated")
.toFuture()
.get();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void supplierReturnsNull() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 10).collect(() -> null, (a, b) -> {
}).subscribe(ts);
ts.assertNoValues()
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: aol/cyclops
@Test
public void fluxConcatMap(){
System.out.println(Flux.just(1,2,3).concatMap(i->Flux.just(i+100,200))
.collect(Collectors.toList()).block());
}
@Test
内容来源于网络,如有侵权,请联系作者删除!