本文整理了Java中reactor.core.publisher.Flux.identityFunction()
方法的一些代码示例,展示了Flux.identityFunction()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.identityFunction()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:identityFunction
暂无
代码示例来源:origin: reactor/reactor-core
/**
* Collect all elements emitted by this {@link Flux} into a {@link Map multimap} that is
* emitted by the resulting {@link Mono} when this sequence completes.
* The key is extracted from each element by applying the {@code keyExtractor}
* {@link Function}, and every element mapping to the same key is stored in the {@link List}
* associated to said key.
*
* <p>
* <img class="marble" src="doc-files/marbles/collectMultiMapWithKeyExtractor.svg" alt="">
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
*
* @param <K> the type of the key extracted from each source element
* @return a {@link Mono} of a {@link Map} of key-List(elements) pairs
*/
public final <K> Mono<Map<K, Collection<T>>> collectMultimap(Function<? super T, ? extends K> keyExtractor) {
return collectMultimap(keyExtractor, identityFunction());
}
代码示例来源:origin: reactor/reactor-core
/**
* Collect all elements emitted by this {@link Flux} into a hashed {@link Map} that is
* emitted by the resulting {@link Mono} when this sequence completes.
* The key is extracted from each element by applying the {@code keyExtractor}
* {@link Function}. In case several elements map to the same key, the associated value
* will be the most recently emitted element.
*
* <p>
* <img class="marble" src="doc-files/marbles/collectMapWithKeyExtractor.svg" alt="">
*
* @param keyExtractor a {@link Function} to map elements to a key for the {@link Map}
* @param <K> the type of the key extracted from each source element
*
* @return a {@link Mono} of a {@link Map} of key-element pairs (only including latest
* element in case of key conflicts)
*/
public final <K> Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor) {
return collectMap(keyExtractor, identityFunction());
}
代码示例来源:origin: reactor/reactor-core
/**
* Divide this sequence into dynamically created {@link Flux} (or groups) for each
* unique key, as produced by the provided keyMapper {@link Function}. Note that
* groupBy works best with a low cardinality of groups, so chose your keyMapper
* function accordingly.
*
* <p>
* <img class="marble" src="doc-files/marbles/groupByWithKeyMapper.svg" alt="">
*
* <p>
* The groups need to be drained and consumed downstream for groupBy to work correctly.
* Notably when the criteria produces a large amount of groups, it can lead to hanging
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
* @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key.
* @param <K> the key type extracted from each value of this sequence
*
* @return a {@link Flux} of {@link GroupedFlux} grouped sequences
*/
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper) {
return groupBy(keyMapper, identityFunction());
}
代码示例来源:origin: reactor/reactor-core
/**
* Sort elements from this {@link Flux} by collecting and sorting them in the background
* then emitting the sorted sequence once this sequence completes.
* Each item emitted by the {@link Flux} must implement {@link Comparable} with
* respect to all other items in the sequence.
*
* <p>Note that calling {@code sort} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
*
* @throws ClassCastException if any item emitted by the {@link Flux} does not implement
* {@link Comparable} with respect to all other items emitted by the {@link Flux}
* @return a sorted {@link Flux}
*/
public final Flux<T> sort(){
return collectSortedList().flatMapIterable(identityFunction());
}
代码示例来源:origin: reactor/reactor-core
/**
* Sort elements from this {@link Flux} using a {@link Comparator} function, by
* collecting and sorting elements in the background then emitting the sorted sequence
* once this sequence completes.
*
* <p>Note that calling {@code sort} with long, non-terminating or infinite sources
* might cause {@link OutOfMemoryError}
*
* @param sortFunction a function that compares two items emitted by this {@link Flux}
* to indicate their sort order
* @return a sorted {@link Flux}
*/
public final Flux<T> sort(Comparator<? super T> sortFunction) {
return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}
代码示例来源:origin: reactor/reactor-core
static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
boolean delayError, int maxConcurrency, int prefetch) {
return onAssembly(new FluxMergeSequential<>(from(sources),
identityFunction(),
maxConcurrency, prefetch, delayError ? FluxConcatMap.ErrorMode.END :
FluxConcatMap.ErrorMode.IMMEDIATE));
}
代码示例来源:origin: reactor/reactor-core
static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
boolean delayError, int maxConcurrency, int prefetch) {
return onAssembly(new FluxMergeSequential<>(new FluxIterable<>(sources),
identityFunction(), maxConcurrency, prefetch,
delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}
代码示例来源:origin: reactor/reactor-core
@SafeVarargs
static <I> Flux<I> mergeSequential(int prefetch, boolean delayError,
Publisher<? extends I>... sources) {
if (sources.length == 0) {
return empty();
}
if (sources.length == 1) {
return from(sources[0]);
}
return onAssembly(new FluxMergeSequential<>(new FluxArray<>(sources),
identityFunction(), sources.length, prefetch,
delayError ? FluxConcatMap.ErrorMode.END : FluxConcatMap.ErrorMode.IMMEDIATE));
}
代码示例来源:origin: reactor/reactor-core
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void mappingBadPrefetch() throws Exception {
Flux<Integer> source = Flux.just(1);
try {
Flux.just(source, source, source).flatMapSequential(Flux.identityFunction(), 10, -99);
} catch (IllegalArgumentException ex) {
assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
//see https://github.com/reactor/reactor-core/issues/577
public void collectionSupplierLimitedFifo() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 1, 3, 4, 1, 1, 1, 1, 2);
StepVerifier.create(flux.distinct(Flux.identityFunction(), () -> new NaiveFifoQueue<>(5)))
.expectNext(1, 2, 3, 4, 5, 6, 1, 2)
.verifyComplete();
StepVerifier.create(flux.distinct(Flux.identityFunction(),
() -> new NaiveFifoQueue<>(3)))
.expectNext(1, 2, 3, 4, 5, 6, 1, 3, 4, 2)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mainDoneThenError() {
TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
StepVerifier.create(source.flux()
.windowWhen(Flux.never(), v -> Mono.just(1))
.flatMap(Flux.identityFunction()))
.then(() -> source.complete().error(new IllegalStateException("boom")))
.expectComplete()
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mainDoneThenNext() {
TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
StepVerifier.create(source.flux()
.windowWhen(Flux.never(), v -> Mono.just(1))
.flatMap(Flux.identityFunction()))
.then(() -> source.complete().next(1))
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mainDoneThenComplete() {
TestPublisher<Integer> source = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
StepVerifier.create(source.flux()
.windowWhen(Flux.never(), v -> Mono.just(1))
.flatMap(Flux.identityFunction()))
.then(() -> source.complete().complete())
.expectComplete()
.verifyThenAssertThat()
.hasNotDroppedErrors()
.hasNotDroppedElements();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void startDoneThenError() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom"))
.error(new IllegalStateException("boom2")))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom2");
source.assertNoSubscribers();
//start doesn't cleanup and as such still has a subscriber
end.assertNoSubscribers();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void endDoneThenError() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.create();
TestPublisher<Integer> end = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.next(1))
.then(() -> end.error(new IllegalStateException("boom"))
.error(new IllegalStateException("boom2")))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom2");
source.assertNoSubscribers();
start.assertNoSubscribers();
//end doesn't cleanup and as such still has a subscriber
}
代码示例来源:origin: reactor/reactor-core
@Test
public void startError() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.create();
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom")))
.expectErrorMessage("boom")
.verify();
source.assertNoSubscribers();
start.assertNoSubscribers();
end.assertNoSubscribers();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void startDoneThenComplete() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom"))
.complete())
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasNotDroppedErrors();
source.assertNoSubscribers();
//start doesn't cleanup and as such still has a subscriber
end.assertNoSubscribers();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void endError() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.create();
TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.next(1))
.then(() -> end.error(new IllegalStateException("boom")))
.expectErrorMessage("boom")
.verify();
source.assertNoSubscribers();
start.assertNoSubscribers();
end.assertNoSubscribers();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void endDoneThenComplete() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.create();
TestPublisher<Integer> end = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.next(1))
.then(() -> end.error(new IllegalStateException("boom"))
.complete())
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasNotDroppedErrors();
source.assertNoSubscribers();
start.assertNoSubscribers();
//end doesn't cleanup and as such still has a subscriber
}
代码示例来源:origin: reactor/reactor-core
@Test
public void startDoneThenNext() {
TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> start = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE);
final TestPublisher<Integer> end = TestPublisher.create();
StepVerifier.create(source.flux()
.windowWhen(start, v -> end)
.flatMap(Flux.identityFunction())
)
.then(() -> start.error(new IllegalStateException("boom"))
.next(1))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasNotDroppedErrors()
.hasNotDroppedElements();
source.assertNoSubscribers();
//start doesn't cleanup and as such still has a subscriber
end.assertNoSubscribers();
}
内容来源于网络,如有侵权,请联系作者删除!