本文整理了Java中reactor.core.publisher.Flux.replay()
方法的一些代码示例,展示了Flux.replay()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.replay()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:replay
[英]Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded amount of onNext signals. Completion and Error will also be replayed.
[中]将此流量转化为热源,并缓存最后发出的信号,以供进一步使用。将保留无限数量的onNext信号。完成和错误也将被重播。
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}. Will
* retain an unbounded amount of onNext signals. Completion and Error will also be
* replayed.
* <p>
* <img class="marble" src="doc-files/marbles/replay.svg" alt="">
*
* @return a replaying {@link ConnectableFlux}
*/
public final ConnectableFlux<T> replay() {
return replay(Integer.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a connectable hot source and cache last emitted signals
* for further {@link Subscriber}. Will retain each onNext up to the given per-item
* expiry timeout.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription
*
* <p>
* <img class="marble" src="doc-files/marbles/replayWithTtl.svg" alt="">
*
* @param ttl Per-item and post termination timeout duration
*
* @return a replaying {@link ConnectableFlux}
*/
public final ConnectableFlux<T> replay(Duration ttl) {
return replay(Integer.MAX_VALUE, ttl);
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a connectable hot source and cache last emitted signals
* for further {@link Subscriber}. Will retain onNext signal for up to the given
* {@link Duration} with a per-item ttl.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription
* <p>
* <img class="marble" src="doc-files/marbles/replayWithTtl.svg" alt="">
*
* @param ttl Per-item and post termination timeout duration
* @param timer a time-capable {@link Scheduler} instance to read current time from
*
* @return a replaying {@link ConnectableFlux}
*/
public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer) {
return replay(Integer.MAX_VALUE, ttl, timer);
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further {@link Subscriber}.
* Will retain up to the given history size onNext signals. Completion and Error will also be
* replayed.
* <p>
* Note that {@code cache(0)} will only cache the terminal signal without
* expiration.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithHistoryLimitForFlux.svg" alt="">
*
* @param history number of elements retained in cache
*
* @return a replaying {@link Flux}
*
*/
public final Flux<T> cache(int history) {
return replay(history).autoConnect();
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a connectable hot source and cache last emitted signals
* for further {@link Subscriber}. Will retain up to the given history size onNext
* signals with a per-item ttl.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription
*
* <p>
* <img class="marble" src="doc-files/marbles/replayWithHistoryAndTtl.svg" alt="">
*
* @param history number of events retained in history excluding complete and error
* @param ttl Per-item and post termination timeout duration
*
* @return a replaying {@link ConnectableFlux}
*/
public final ConnectableFlux<T> replay(int history, Duration ttl) {
return replay(history, ttl, Schedulers.parallel());
}
代码示例来源:origin: reactor/reactor-core
/**
* Turn this {@link Flux} into a hot source and cache last emitted signals for further
* {@link Subscriber}. Will retain up to the given history size and apply a per-item expiry
* timeout.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlAndMaxLimitForFlux.svg"
* alt="">
*
* @param history number of elements retained in cache
* @param ttl Time-to-live for each cached item and post termination.
* @param timer the {@link Scheduler} on which to measure the duration.
*
* @return a replaying {@link Flux}
*/
public final Flux<T> cache(int history, Duration ttl, Scheduler timer) {
return replay(history, ttl, timer).autoConnect();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_touchAndAssertState() {
return Arrays.asList(
scenario(f -> f.replay().autoConnect())
);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(
scenario(f -> f.replay().autoConnect()),
scenario(f -> f.replay().refCount())
);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void failPrefetch(){
Flux.never()
.replay( -1);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void failTime(){
Flux.never()
.replay( Duration.ofDays(-1));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelDoesntTriggerDisconnectErrorOnFirstSubscribeNoComplete() {
AtomicInteger nextCount = new AtomicInteger();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
Flux<String> flux = Flux.<String>create(sink -> {
sink.next("test");
})
.replay(1)
.refCount(1);
flux.subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
flux.next().subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
assertThat(nextCount).hasValue(2);
assertThat(errorRef).hasValue(null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelDoesntTriggerDisconnectErrorOnFirstSubscribeDoComplete() {
AtomicInteger nextCount = new AtomicInteger();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
Flux<String> flux = Flux.<String>create(sink -> {
sink.next("test");
sink.complete();
})
.replay(1)
.refCount(1);
flux.subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
flux.next().subscribe(v -> nextCount.incrementAndGet(), errorRef::set);
assertThat(nextCount).hasValue(2);
assertThat(errorRef).hasValue(null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testFusion() {
StepVerifier.create(Flux.just(1, 2, 3)
.replay()
.refCount(1, Duration.ofSeconds(1)))
.expectFusion()
.expectNext(1, 2, 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldNotRetainSubscriptionToSourceWhenComplete() throws Exception {
VirtualTimeScheduler scheduler = VirtualTimeScheduler.create();
Duration gracePeriod = Duration.ofMillis(10);
Flux<String> f = Flux.just("hello world")
.replay(1)
.refCount(1, gracePeriod, scheduler);
AssertSubscriber<String> s = AssertSubscriber.create();
f.subscribe(s);
scheduler.advanceTimeBy(gracePeriod);
StepVerifier.create(f.next())
.expectNext("hello world")
.verifyComplete();
scheduler.advanceTimeBy(gracePeriod);
s.assertValueCount(1).assertNoError().assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxHistoryTTL() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(2, Duration.ofMillis(2000))
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTL() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000))
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTLMillis() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000), vts)
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxTTLFused() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(Duration.ofMillis(2000))
.autoConnect()
.elapsed();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFluxHistoryTTLFused() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay(2, Duration.ofMillis(2000))
.autoConnect()
.elapsed()
.log();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.expectFusion(Fuseable.ANY)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cacheFlux() {
Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.replay()
.autoConnect()
.elapsed();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();
StepVerifier.create(source)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(3)))
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!