reactor.core.publisher.Flux.startWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(259)

本文整理了Java中reactor.core.publisher.Flux.startWith()方法的一些代码示例,展示了Flux.startWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.startWith()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:startWith

Flux.startWith介绍

[英]Prepend the given Iterable before this Flux sequence.
[中]在此通量序列之前,预先结束给定的Iterable。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Prepend the given {@link Iterable} before this {@link Flux} sequence.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/startWithIterable.svg" alt="">
 *
 * @param iterable the sequence of values to start the resulting {@link Flux} with
 *
 * @return a new {@link Flux} prefixed with elements from an {@link Iterable}
 */
public final Flux<T> startWith(Iterable<? extends T> iterable) {
  return startWith(fromIterable(iterable));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Prepend the given values before this {@link Flux} sequence.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/startWithValues.svg" alt="">
 *
 * @param values the array of values to start the resulting {@link Flux} with
 *
 * @return a new {@link Flux} prefixed with the given elements
 */
@SafeVarargs
public final Flux<T> startWith(T... values) {
  return startWith(just(values));
}

代码示例来源:origin: reactor/reactor-core

lasts.startWith(firsts).subscribe(
    it -> {
      res2.add("" + it);
lasts.startWith(1, 2, 3).subscribe(
    it -> {
      res3.add("" + it);

代码示例来源:origin: reactor/reactor-core

@Test
public void startWith(){
  StepVerifier.create(Flux.just(1, 2, 3).startWith(Arrays.asList(-1, 0)))
        .expectNext(-1, 0, 1, 2, 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedBatchingGrouping() {
  StepVerifier.create(
      Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .groupBy(i -> i % 2 == 0 ? "even" : "odd")
        .concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
                 .map(String::valueOf) //map to string
                 .startWith(g.key())) //start with the group's key
  )
        .expectNext("odd", "1", "3", "5", "11", "13")
        .expectNext("even", "2", "4", "6", "12")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noStackOverflow() {
  int n = 5000;
  
  Flux<Integer> source = Flux.just(1);
  
  Flux<Integer> result = source;
  
  for (int i = 0; i < n; i++) {
    result = result.startWith(source);
  }
  
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  
  result.subscribe(ts);
  
  ts.assertValueCount(n + 1)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void dontBreakFluxArrayConcatMap() {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();
    
    Flux.just(1, 2).concatMap(Flux::just).startWith(Flux.just(3))
    .subscribe(ts);

    
    ts.assertValues(3, 1, 2)
    .assertNoError()
    .assertComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noStackOverflow2() {
  int n = 5000;
  
  Flux<Integer> source = Flux.just(1, 2).concatMap(Flux::just);
  Flux<Integer> add = Flux.just(3);
  
  Flux<Integer> result = source;
  
  for (int i = 0; i < n; i++) {
    result = result.startWith(add);
  }
  
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  
  result.subscribe(ts);
  
  ts.assertValueCount(n + 2)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noStackOverflow3() {
  int n = 5000;
  
  Flux<Flux<Integer>> source = Flux.just(Flux.just(1), Flux.just(2));
  Flux<Flux<Integer>> add = Flux.just(Flux.just(3));
  
  Flux<Flux<Integer>> result = source;
  
  for (int i = 0; i < n; i++) {
    result = result.startWith(add);
  }
  
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  
  result.subscribe(ts);
  
  ts.assertValueCount(n + 2)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param values
 * @return
 * @see reactor.core.publisher.Flux#startWith(java.lang.Object[])
 */
public final Flux<T> startWith(T... values) {
  return boxed.startWith(values);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param iterable
 * @return
 * @see reactor.core.publisher.Flux#startWith(java.lang.Iterable)
 */
public final Flux<T> startWith(Iterable<? extends T> iterable) {
  return boxed.startWith(iterable);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param publisher
 * @return
 * @see reactor.core.publisher.Flux#startWith(org.reactivestreams.Publisher)
 */
public final Flux<T> startWith(Publisher<? extends T> publisher) {
  return boxed.startWith(publisher);
}
/**

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Prepend the given {@link Iterable} before this {@link Flux} sequence.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/startWithIterable.svg" alt="">
 *
 * @param iterable the sequence of values to start the resulting {@link Flux} with
 *
 * @return a new {@link Flux} prefixed with elements from an {@link Iterable}
 */
public final Flux<T> startWith(Iterable<? extends T> iterable) {
  return startWith(fromIterable(iterable));
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Prepend the given values before this {@link Flux} sequence.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/startWithValues.svg" alt="">
 *
 * @param values the array of values to start the resulting {@link Flux} with
 *
 * @return a new {@link Flux} prefixed with the given elements
 */
@SafeVarargs
public final Flux<T> startWith(T... values) {
  return startWith(just(values));
}

代码示例来源:origin: io.pivotal/pivotal-cloudfoundry-client-reactor

private static <T> Function<T, Flux<T>> requestAdditionalPages(Function<Integer, Mono<T>> pageSupplier, Function<T, Integer> totalPagesSupplier) {
  return response -> {
    Integer totalPages = Optional.ofNullable(totalPagesSupplier.apply(response)).orElse(1);
    return Flux
      .range(2, totalPages - 1)
      .flatMap(pageSupplier)
      .startWith(response)
      .buffer()
      .flatMapIterable(d -> d);
  };
}

代码示例来源:origin: scalecube/scalecube-services

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
 return Flux.from(HeadAndTail.createFrom(Flux.from(payloads).map(this::toMessage)))
   .flatMap(
     pair -> {
      ServiceMessage message = pair.head();
      validateRequest(message);
      Flux<ServiceMessage> messages = Flux.from(pair.tail()).startWith(message);
      ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
      return methodInvoker.invokeBidirectional(messages, ServiceMessageCodec::decodeData);
     })
   .map(this::toPayload);
}

代码示例来源:origin: io.scalecube/scalecube-services-transport-rsocket

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
 return Flux.from(HeadAndTail.createFrom(Flux.from(payloads).map(this::toMessage)))
   .flatMap(
     pair -> {
      ServiceMessage message = pair.head();
      validateRequest(message);
      Flux<ServiceMessage> messages = Flux.from(pair.tail()).startWith(message);
      ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
      return methodInvoker.invokeBidirectional(messages, ServiceMessageCodec::decodeData);
     })
   .map(this::toPayload);
}

代码示例来源:origin: io.scalecube/rsocket-services-transport

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
 return Flux.from(HeadAndTail.createFrom(Flux.from(payloads).map(this::toMessage)))
   .flatMap(
     pair -> {
      ServiceMessage message = pair.head();
      validateRequest(message);
      Flux<ServiceMessage> messages = Flux.from(pair.tail()).startWith(message);
      ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier());
      return methodInvoker.invokeBidirectional(messages, ServiceMessageCodec::decodeData);
     })
   .onErrorResume(this::toMessage)
   .map(this::toPayload);
}

代码示例来源:origin: akarnokd/akarnokd-misc

static <T> Function<Flux<T>, Mono<Void>> composeIfNonEmpty(Function<? super Flux<T>, ? extends Mono<Void>> f) {
  return g ->
    g.publish(h -> 
      h.limitRequest(1).concatMap(first -> f.apply(h.startWith(first)))
    ).ignoreElements();
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void simple2() {
  Function<Flux<Integer>, Flux<Integer>> transform = g -> g.doOnNext(System.out::println);
  Mono<Integer> source = Flux.range(1, 5)
      .publish(f ->
        f.limitRequest(1)
          .concatMap(first -> transform.apply(f.startWith(first)))
      )
      .ignoreElements()
  ;
  source.subscribeWith(new TestSubscriber<Integer>())
  .assertResult();
}

相关文章

Flux类方法