reactor.core.publisher.Flux.replay()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.5k)|赞(0)|评价(0)|浏览(357)

本文整理了Java中reactor.core.publisher.Flux.replay()方法的一些代码示例,展示了Flux.replay()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.replay()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:replay

Flux.replay介绍

[英]Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded amount of onNext signals. Completion and Error will also be replayed.
[中]将此流量转化为热源,并缓存最后发出的信号,以供进一步使用。将保留无限数量的onNext信号。完成和错误也将被重播。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}. Will
 * retain an unbounded amount of onNext signals. Completion and Error will also be
 * replayed.
 * <p>
 * <img class="marble" src="doc-files/marbles/replay.svg" alt="">
 *
 * @return a replaying {@link ConnectableFlux}
 */
public final ConnectableFlux<T> replay() {
  return replay(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a connectable hot source and cache last emitted signals
 * for further {@link Subscriber}. Will retain each onNext up to the given per-item
 * expiry timeout.
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/replayWithTtl.svg" alt="">
 *
 * @param ttl Per-item and post termination timeout duration
 *
 * @return a replaying {@link ConnectableFlux}
 */
public final ConnectableFlux<T> replay(Duration ttl) {
  return replay(Integer.MAX_VALUE, ttl);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a connectable hot source and cache last emitted signals
 * for further {@link Subscriber}. Will retain onNext signal for up to the given
 * {@link Duration} with a per-item ttl.
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription
 * <p>
 * <img class="marble" src="doc-files/marbles/replayWithTtl.svg" alt="">
 *
 * @param ttl Per-item and post termination timeout duration
 * @param timer a time-capable {@link Scheduler} instance to read current time from
 *
 * @return a replaying {@link ConnectableFlux}
 */
public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer) {
  return replay(Integer.MAX_VALUE, ttl, timer);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}.
 * Will retain up to the given history size onNext signals. Completion and Error will also be
 * replayed.
 * <p>
 *     Note that {@code cache(0)} will only cache the terminal signal without
 *     expiration.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithHistoryLimitForFlux.svg" alt="">
 *
 * @param history number of elements retained in cache
 *
 * @return a replaying {@link Flux}
 *
 */
public final Flux<T> cache(int history) {
  return replay(history).autoConnect();
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a connectable hot source and cache last emitted signals
 * for further {@link Subscriber}. Will retain up to the given history size onNext
 * signals with a per-item ttl.
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/replayWithHistoryAndTtl.svg" alt="">
 *
 * @param history number of events retained in history excluding complete and error
 * @param ttl Per-item and post termination timeout duration
 *
 * @return a replaying {@link ConnectableFlux}
 */
public final ConnectableFlux<T> replay(int history, Duration ttl) {
  return replay(history, ttl, Schedulers.parallel());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Turn this {@link Flux} into a hot source and cache last emitted signals for further
 * {@link Subscriber}. Will retain up to the given history size and apply a per-item expiry
 * timeout.
 * <p>
 *   Completion and Error will also be replayed until {@code ttl} triggers in which case
 *   the next {@link Subscriber} will start over a new subscription.
 * <p>
 * <img class="marble" src="doc-files/marbles/cacheWithTtlAndMaxLimitForFlux.svg"
 * alt="">
 *
 * @param history number of elements retained in cache
 * @param ttl Time-to-live for each cached item and post termination.
 * @param timer the {@link Scheduler} on which to measure the duration.
 *
 * @return a replaying {@link Flux}
 */
public final Flux<T> cache(int history, Duration ttl, Scheduler timer) {
  return replay(history, ttl, timer).autoConnect();
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_touchAndAssertState() {
  return Arrays.asList(
      scenario(f -> f.replay().autoConnect())
  );
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(
      scenario(f -> f.replay().autoConnect()),
      scenario(f -> f.replay().refCount())
  );
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void failPrefetch(){
  Flux.never()
    .replay( -1);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void failTime(){
  Flux.never()
    .replay( Duration.ofDays(-1));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelDoesntTriggerDisconnectErrorOnFirstSubscribeNoComplete() {
  AtomicInteger nextCount = new AtomicInteger();
  AtomicReference<Throwable> errorRef = new AtomicReference<>();
  Flux<String> flux = Flux.<String>create(sink -> {
    sink.next("test");
  })
      .replay(1)
      .refCount(1);
  flux.subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
  flux.next().subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
  assertThat(nextCount).hasValue(2);
  assertThat(errorRef).hasValue(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelDoesntTriggerDisconnectErrorOnFirstSubscribeDoComplete() {
  AtomicInteger nextCount = new AtomicInteger();
  AtomicReference<Throwable> errorRef = new AtomicReference<>();
  Flux<String> flux = Flux.<String>create(sink -> {
    sink.next("test");
    sink.complete();
  })
      .replay(1)
      .refCount(1);
  flux.subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
  flux.next().subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
  assertThat(nextCount).hasValue(2);
  assertThat(errorRef).hasValue(null);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testFusion() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .replay()
              .refCount(1, Duration.ofSeconds(1)))
        .expectFusion()
        .expectNext(1, 2, 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldNotRetainSubscriptionToSourceWhenComplete() throws Exception {
  VirtualTimeScheduler scheduler = VirtualTimeScheduler.create();
  Duration gracePeriod = Duration.ofMillis(10);
  Flux<String> f = Flux.just("hello world")
             .replay(1)
             .refCount(1, gracePeriod, scheduler);
  AssertSubscriber<String> s = AssertSubscriber.create();
  f.subscribe(s);
  scheduler.advanceTimeBy(gracePeriod);
  StepVerifier.create(f.next())
        .expectNext("hello world")
        .verifyComplete();
  scheduler.advanceTimeBy(gracePeriod);
  s.assertValueCount(1).assertNoError().assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxHistoryTTL() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(2, Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTL() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTLMillis() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(Duration.ofMillis(2000), vts)
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxTTLFused() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFluxHistoryTTLFused() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay(2, Duration.ofMillis(2000))
                       .autoConnect()
                       .elapsed()
      .log();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .expectFusion(Fuseable.ANY)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cacheFlux() {
  Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
                       .delayElements(Duration.ofMillis(1000))
                       .replay()
                       .autoConnect()
                       .elapsed();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
        .verifyComplete();
  StepVerifier.create(source)
        .then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
        .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
        .verifyComplete();
}

相关文章

Flux类方法