本文整理了Java中reactor.core.publisher.Flux.startWith()
方法的一些代码示例,展示了Flux.startWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.startWith()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:startWith
[英]Prepend the given Iterable before this Flux sequence.
[中]在此通量序列之前,预先结束给定的Iterable。
代码示例来源:origin: reactor/reactor-core
/**
* Prepend the given {@link Iterable} before this {@link Flux} sequence.
*
* <p>
* <img class="marble" src="doc-files/marbles/startWithIterable.svg" alt="">
*
* @param iterable the sequence of values to start the resulting {@link Flux} with
*
* @return a new {@link Flux} prefixed with elements from an {@link Iterable}
*/
public final Flux<T> startWith(Iterable<? extends T> iterable) {
return startWith(fromIterable(iterable));
}
代码示例来源:origin: reactor/reactor-core
/**
* Prepend the given values before this {@link Flux} sequence.
*
* <p>
* <img class="marble" src="doc-files/marbles/startWithValues.svg" alt="">
*
* @param values the array of values to start the resulting {@link Flux} with
*
* @return a new {@link Flux} prefixed with the given elements
*/
@SafeVarargs
public final Flux<T> startWith(T... values) {
return startWith(just(values));
}
代码示例来源:origin: reactor/reactor-core
lasts.startWith(firsts).subscribe(
it -> {
res2.add("" + it);
lasts.startWith(1, 2, 3).subscribe(
it -> {
res3.add("" + it);
代码示例来源:origin: reactor/reactor-core
@Test
public void startWith(){
StepVerifier.create(Flux.just(1, 2, 3).startWith(Arrays.asList(-1, 0)))
.expectNext(-1, 0, 1, 2, 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void advancedBatchingGrouping() {
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noStackOverflow() {
int n = 5000;
Flux<Integer> source = Flux.just(1);
Flux<Integer> result = source;
for (int i = 0; i < n; i++) {
result = result.startWith(source);
}
AssertSubscriber<Integer> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 1)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void dontBreakFluxArrayConcatMap() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.just(1, 2).concatMap(Flux::just).startWith(Flux.just(3))
.subscribe(ts);
ts.assertValues(3, 1, 2)
.assertNoError()
.assertComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noStackOverflow2() {
int n = 5000;
Flux<Integer> source = Flux.just(1, 2).concatMap(Flux::just);
Flux<Integer> add = Flux.just(3);
Flux<Integer> result = source;
for (int i = 0; i < n; i++) {
result = result.startWith(add);
}
AssertSubscriber<Integer> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noStackOverflow3() {
int n = 5000;
Flux<Flux<Integer>> source = Flux.just(Flux.just(1), Flux.just(2));
Flux<Flux<Integer>> add = Flux.just(Flux.just(3));
Flux<Flux<Integer>> result = source;
for (int i = 0; i < n; i++) {
result = result.startWith(add);
}
AssertSubscriber<Object> ts = AssertSubscriber.create();
result.subscribe(ts);
ts.assertValueCount(n + 2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param values
* @return
* @see reactor.core.publisher.Flux#startWith(java.lang.Object[])
*/
public final Flux<T> startWith(T... values) {
return boxed.startWith(values);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param iterable
* @return
* @see reactor.core.publisher.Flux#startWith(java.lang.Iterable)
*/
public final Flux<T> startWith(Iterable<? extends T> iterable) {
return boxed.startWith(iterable);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param publisher
* @return
* @see reactor.core.publisher.Flux#startWith(org.reactivestreams.Publisher)
*/
public final Flux<T> startWith(Publisher<? extends T> publisher) {
return boxed.startWith(publisher);
}
/**
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Prepend the given {@link Iterable} before this {@link Flux} sequence.
*
* <p>
* <img class="marble" src="doc-files/marbles/startWithIterable.svg" alt="">
*
* @param iterable the sequence of values to start the resulting {@link Flux} with
*
* @return a new {@link Flux} prefixed with elements from an {@link Iterable}
*/
public final Flux<T> startWith(Iterable<? extends T> iterable) {
return startWith(fromIterable(iterable));
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Prepend the given values before this {@link Flux} sequence.
*
* <p>
* <img class="marble" src="doc-files/marbles/startWithValues.svg" alt="">
*
* @param values the array of values to start the resulting {@link Flux} with
*
* @return a new {@link Flux} prefixed with the given elements
*/
@SafeVarargs
public final Flux<T> startWith(T... values) {
return startWith(just(values));
}
代码示例来源:origin: io.pivotal/pivotal-cloudfoundry-client-reactor
private static <T> Function<T, Flux<T>> requestAdditionalPages(Function<Integer, Mono<T>> pageSupplier, Function<T, Integer> totalPagesSupplier) {
return response -> {
Integer totalPages = Optional.ofNullable(totalPagesSupplier.apply(response)).orElse(1);
return Flux
.range(2, totalPages - 1)
.flatMap(pageSupplier)
.startWith(response)
.buffer()
.flatMapIterable(d -> d);
};
}
代码示例来源:origin: scalecube/scalecube-services
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(HeadAndTail.createFrom(Flux.from(payloads).map(this::toMessage)))
.flatMap(
pair -> {
ServiceMessage message = pair.head();
validateRequest(message);
Flux<ServiceMessage> messages = Flux.from(pair.tail()).startWith(message);
ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
return methodInvoker.invokeBidirectional(messages, ServiceMessageCodec::decodeData);
})
.map(this::toPayload);
}
代码示例来源:origin: io.scalecube/scalecube-services-transport-rsocket
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(HeadAndTail.createFrom(Flux.from(payloads).map(this::toMessage)))
.flatMap(
pair -> {
ServiceMessage message = pair.head();
validateRequest(message);
Flux<ServiceMessage> messages = Flux.from(pair.tail()).startWith(message);
ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
return methodInvoker.invokeBidirectional(messages, ServiceMessageCodec::decodeData);
})
.map(this::toPayload);
}
代码示例来源:origin: io.scalecube/rsocket-services-transport
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(HeadAndTail.createFrom(Flux.from(payloads).map(this::toMessage)))
.flatMap(
pair -> {
ServiceMessage message = pair.head();
validateRequest(message);
Flux<ServiceMessage> messages = Flux.from(pair.tail()).startWith(message);
ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
return methodInvoker.invokeBidirectional(messages, ServiceMessageCodec::decodeData);
})
.onErrorResume(this::toMessage)
.map(this::toPayload);
}
代码示例来源:origin: akarnokd/akarnokd-misc
static <T> Function<Flux<T>, Mono<Void>> composeIfNonEmpty(Function<? super Flux<T>, ? extends Mono<Void>> f) {
return g ->
g.publish(h ->
h.limitRequest(1).concatMap(first -> f.apply(h.startWith(first)))
).ignoreElements();
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void simple2() {
Function<Flux<Integer>, Flux<Integer>> transform = g -> g.doOnNext(System.out::println);
Mono<Integer> source = Flux.range(1, 5)
.publish(f ->
f.limitRequest(1)
.concatMap(first -> transform.apply(f.startWith(first)))
)
.ignoreElements()
;
source.subscribeWith(new TestSubscriber<Integer>())
.assertResult();
}
内容来源于网络,如有侵权,请联系作者删除!