reactor.core.publisher.Flux.first()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.2k)|赞(0)|评价(0)|浏览(504)

本文整理了Java中reactor.core.publisher.Flux.first()方法的一些代码示例,展示了Flux.first()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.first()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:first

Flux.first介绍

[英]Pick the first Publisher to emit any signal (onNext/onError/onComplete) and replay all signals from that Publisher, effectively behaving like the fastest of these competing sources.
[中]选择第一个发出任何信号的发布者(onNext/onError/onComplete)并重播来自该发布者的所有信号,有效地表现为这些竞争源中最快的一个。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Pick the first {@link Publisher} between this {@link Flux} and another publisher
 * to emit any signal (onNext/onError/onComplete) and replay all signals from that
 * {@link Publisher}, effectively behaving like the fastest of these competing sources.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/orForFlux.svg" alt="">
 *
 * @param other the {@link Publisher} to race with
 *
 * @return the fastest sequence
 * @see #first
 */
public final Flux<T> or(Publisher<? extends T> other) {
  if (this instanceof FluxFirstEmitting) {
    FluxFirstEmitting<T> publisherAmb = (FluxFirstEmitting<T>) this;
    FluxFirstEmitting<T> result = publisherAmb.ambAdditionalSource(other);
    if (result != null) {
      return result;
    }
  }
  return first(this, other);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void arrayNull() {
  Flux.first((Publisher<Integer>[]) null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleAmbTest() throws Exception {
  int elements = 40;
  CountDownLatch latch = new CountDownLatch(elements / 2 + 1);
  Flux<SensorData> p = Flux.first(sensorOdd(), sensorEven())
                 .log("first");
  p.subscribe(d -> latch.countDown(), null, latch::countDown);
  Thread.sleep(1000);
  generateData(elements);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void pairWiseIterable() {
    Flux<Integer> f = Flux.first(Arrays.asList(Mono.just(1), Mono.just(2)))
               .or(Mono.just(3));

    Assert.assertTrue(f instanceof FluxFirstEmitting);
    FluxFirstEmitting<Integer> s = (FluxFirstEmitting<Integer>) f;
    Assert.assertTrue(s.array != null);
    Assert.assertTrue(s.array.length == 2);

    f.subscribeWith(AssertSubscriber.create())
     .assertValues(1)
     .assertComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pairWise() {
  Flux<Integer> f = Flux.first(Mono.just(1), Mono.just(2))
             .or(Mono.just(3));
  Assert.assertTrue(f instanceof FluxFirstEmitting);
  FluxFirstEmitting<Integer> s = (FluxFirstEmitting<Integer>) f;
  Assert.assertTrue(s.array != null);
  Assert.assertTrue(s.array.length == 3);
  f.subscribeWith(AssertSubscriber.create())
   .assertValues(1)
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void singleIterableNullSource() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.first(Arrays.asList((Publisher<Object>) null))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void secondEmitsError() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  RuntimeException ex = new RuntimeException("forced failure");
  Flux.first(Flux.never(), Flux.<Integer>error(ex))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(ex.getClass());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void iterableOneIsNullSource() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.first(Arrays.asList(Flux.never(),
      (Publisher<Object>) null,
      Flux.never()))
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void singleArrayNullSource() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.first((Publisher<Object>) null)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void firstWinner() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.first(Flux.range(1, 10), Flux.range(11, 10))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void firstWinnerBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(5);
  Flux.first(Flux.range(1, 10), Flux.range(11, 10))
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5)
   .assertNotComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void arrayOneIsNullSource() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.first(Flux.never(), null, Flux.never())
    .subscribe
   (ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(NullPointerException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void secondWinner() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.first(Flux.never(),
      Flux.range(11, 10)
        .log())
    .subscribe(ts);
  ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void dontBreakAmb() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  
  Flux.first(Flux.just(1), Flux.just(2)).or(Flux.just(3))
    .subscribe(ts);
  
  ts.assertValues(1)
  .assertNoError()
  .assertComplete();
}

代码示例来源:origin: rsocket/rsocket-java

@Test
public void testChannelRequestCancellation() {
 MonoProcessor<Void> cancelled = MonoProcessor.create();
 Flux<Payload> request = Flux.<Payload>never().doOnCancel(cancelled::onComplete);
 rule.socket.requestChannel(request).subscribe().dispose();
 Flux.first(
     cancelled,
     Flux.error(new IllegalStateException("Channel request not cancelled"))
       .delaySubscription(Duration.ofSeconds(1)))
   .blockFirst();
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Pick the first {@link Publisher} between this {@link Flux} and another publisher
 * to emit any signal (onNext/onError/onComplete) and replay all signals from that
 * {@link Publisher}, effectively behaving like the fastest of these competing sources.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/orForFlux.svg" alt="">
 *
 * @param other the {@link Publisher} to race with
 *
 * @return the fastest sequence
 * @see #first
 */
public final Flux<T> or(Publisher<? extends T> other) {
  if (this instanceof FluxFirstEmitting) {
    FluxFirstEmitting<T> publisherAmb = (FluxFirstEmitting<T>) this;
    FluxFirstEmitting<T> result = publisherAmb.ambAdditionalSource(other);
    if (result != null) {
      return result;
    }
  }
  return first(this, other);
}

相关文章

Flux类方法