本文整理了Java中reactor.core.publisher.Flux.log()
方法的一些代码示例,展示了Flux.log()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.log()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:log
[英]Observe all Reactive Streams signals and trace them using Logger support. Default will use Level#INFO and java.util.logging. If SLF4J is available, it will be used instead.
The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".
[中]观察所有反应流信号,并使用记录器支持对其进行跟踪。默认值将使用Level#INFO和java。util。登录中。如果SLF4J可用,将改用它。
默认日志类别为“reactor.Flux.”,后跟源运算符生成的后缀,例如“reactor.Flux.Map”。
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Started reminders"))
.flatMap(i -> this.sendReminders())
.onErrorContinue((ex, value) -> log.warn(
"Unexpected error while sending reminders",
ex
))
.subscribe();
}
代码示例来源:origin: codecentric/spring-boot-admin
public void start() {
subscription = Flux.from(publisher)
.log(log.getName(), Level.FINEST)
.doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
.ofType(eventType)
.cast(eventType)
.compose(this::handle)
.onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
.subscribe();
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public void start() {
super.start();
intervalSubscription = Flux.interval(updateInterval)
.doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("status-monitor"))
.concatMap(i -> this.updateStatusForAllInstances())
.onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
ex
))
.subscribe();
}
代码示例来源:origin: reactor/reactor-core
public Flux<Integer> commit() {
System.out.println("commit");
this.commitProbe = PublisherProbe.of(
Flux.just(3, 2, 1)
.log("commit method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
return commitProbe.flux();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleMergeMonoTest() throws Exception {
CountDownLatch latch = new CountDownLatch(2);
Flux<Integer> p = Flux.merge(Flux.<Integer>empty().next(), Mono.just(1))
.log("mono");
awaitLatch(p, latch);
}
代码示例来源:origin: reactor/reactor-core
public Flux<Integer> commitError() {
this.commitProbe = PublisherProbe.of(
Flux.just(3, 2, 1)
.delayElements(DELAY)
.map(i -> 100 / (i - 1)) //results in divide by 0
.log("commit method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
return commitProbe.flux();
}
代码示例来源:origin: reactor/reactor-core
public Flux<Integer> rollback() {
this.rollbackProbe = PublisherProbe.of(
Flux.just(5, 4, 3, 2, 1)
.log("rollback method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
return rollbackProbe.flux();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleAmbTest() throws Exception {
int elements = 40;
CountDownLatch latch = new CountDownLatch(elements / 2 + 1);
Flux<SensorData> p = Flux.first(sensorOdd(), sensorEven())
.log("first");
p.subscribe(d -> latch.countDown(), null, latch::countDown);
Thread.sleep(1000);
generateData(elements);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void cancelEvictAll() {
StepVerifier.create(Flux.range(1, 5)
.log()
.onBackpressureBuffer(Duration.ofMinutes(1), Integer.MAX_VALUE, this,
Schedulers.single()),
0)
.thenAwait(Duration.ofMillis(100)) //small hiccup to cancel after the prefetch
.thenCancel()
.verify();
assertThat(evicted).containsExactly(1, 2, 3, 4, 5);
}
代码示例来源:origin: reactor/reactor-core
public Flux<Integer> commitDelay() {
this.commitProbe = PublisherProbe.of(
Flux.just(3, 2, 1)
.delayElements(DELAY)
.log("commit method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
return commitProbe.flux();
}
代码示例来源:origin: reactor/reactor-core
public Flux<Integer> rollbackDelay() {
this.rollbackProbe = PublisherProbe.of(
Flux.just(5, 4, 3, 2, 1)
.delayElements(DELAY)
.log("rollback method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
return rollbackProbe.flux();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleZipTest() throws Exception {
int elements = 69;
CountDownLatch latch = new CountDownLatch((elements / 2) + 1);
Publisher<SensorData> p = Flux.zip(sensorEven(), sensorOdd(), this::computeMin)
.log("zip");
generateData(elements);
awaitLatch(p, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleMergeTest() throws Exception {
int elements = 40;
CountDownLatch latch = new CountDownLatch(elements + 1);
Publisher<SensorData> p = Flux.merge(sensorOdd(), sensorEven())
.log("merge");
generateData(elements);
awaitLatch(p, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleZipTest2() throws Exception {
int elements = 1;
CountDownLatch latch = new CountDownLatch(elements + 1);
Publisher<SensorData> p = Flux.zip(sensorEven(), Flux.just(new SensorData(1L, 14.0f)), this::computeMin)
.log("zip2");
generateData(elements);
awaitLatch(p, latch);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expectNextCountAfterThenConsumeWhile() {
StepVerifier.create(Flux.range(1, 5).log())
.thenConsumeWhile(i -> i <= 2)
.expectNextCount(3)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void innerCancellationCancelsMainSequence() {
StepVerifier.create(Flux.just("red", "green", "#", "black", "white")
.log()
.windowWhile(s -> !s.equals("#"))
.flatMap(w -> w.take(1)))
.expectNext("red")
.thenCancel()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testGreen() {
FluxProcessor<String, String> processor = EmitterProcessor.create();
AssertSubscriber<String> subscriber = AssertSubscriber.create(1);
processor.subscribe(subscriber);
Flux.fromIterable(DATA)
.log()
.subscribe(processor);
subscriber.awaitAndAssertNextValues("1");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void mergeWithNoInterleave() throws Exception{
Flux.concat(emitter1.log("test1"), emitter2.log("test2")).log().subscribe(ts);
emitValues();
ts.assertValues(1L, 3L, 2L, 4L).assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void secondWinner() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.first(Flux.never(),
Flux.range(11, 10)
.log())
.subscribe(ts);
ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void verifyVirtualTimeOnComplete() {
StepVerifier.withVirtualTime(() -> Flux.empty()
.delaySubscription(Duration.ofHours(1))
.log())
.thenAwait(Duration.ofHours(1))
.expectComplete()
.verify();
}
内容来源于网络,如有侵权,请联系作者删除!