reactor.core.publisher.Flux.log()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(492)

本文整理了Java中reactor.core.publisher.Flux.log()方法的一些代码示例,展示了Flux.log()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.log()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:log

Flux.log介绍

[英]Observe all Reactive Streams signals and trace them using Logger support. Default will use Level#INFO and java.util.logging. If SLF4J is available, it will be used instead.

The default log category will be "reactor.Flux.", followed by a suffix generated from the source operator, e.g. "reactor.Flux.Map".
[中]观察所有反应流信号,并使用记录器支持对其进行跟踪。默认值将使用Level#INFO和java。util。登录中。如果SLF4J可用,将改用它。
默认日志类别为“reactor.Flux.”,后跟源运算符生成的后缀,例如“reactor.Flux.Map”。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  subscription = Flux.from(publisher)
            .log(log.getName(), Level.FINEST)
            .doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
            .ofType(eventType)
            .cast(eventType)
            .compose(this::handle)
            .onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
            .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public void start() {
  super.start();
  intervalSubscription = Flux.interval(updateInterval)
                .doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
                .log(log.getName(), Level.FINEST)
                .subscribeOn(Schedulers.newSingle("status-monitor"))
                .concatMap(i -> this.updateStatusForAllInstances())
                .onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
                  ex
                ))
                .subscribe();
}

代码示例来源:origin: reactor/reactor-core

public Flux<Integer> commit() {
  System.out.println("commit");
  this.commitProbe = PublisherProbe.of(
      Flux.just(3, 2, 1)
        .log("commit method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
  return commitProbe.flux();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleMergeMonoTest() throws Exception {
  CountDownLatch latch = new CountDownLatch(2);
  Flux<Integer> p = Flux.merge(Flux.<Integer>empty().next(), Mono.just(1))
                 .log("mono");
  awaitLatch(p, latch);
}

代码示例来源:origin: reactor/reactor-core

public Flux<Integer> commitError() {
  this.commitProbe = PublisherProbe.of(
      Flux.just(3, 2, 1)
        .delayElements(DELAY)
        .map(i -> 100 / (i - 1)) //results in divide by 0
        .log("commit method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
  return commitProbe.flux();
}

代码示例来源:origin: reactor/reactor-core

public Flux<Integer> rollback() {
  this.rollbackProbe = PublisherProbe.of(
      Flux.just(5, 4, 3, 2, 1)
        .log("rollback method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
  return rollbackProbe.flux();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleAmbTest() throws Exception {
  int elements = 40;
  CountDownLatch latch = new CountDownLatch(elements / 2 + 1);
  Flux<SensorData> p = Flux.first(sensorOdd(), sensorEven())
                 .log("first");
  p.subscribe(d -> latch.countDown(), null, latch::countDown);
  Thread.sleep(1000);
  generateData(elements);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelEvictAll() {
  StepVerifier.create(Flux.range(1, 5)
              .log()
      .onBackpressureBuffer(Duration.ofMinutes(1), Integer.MAX_VALUE, this,
          Schedulers.single()),
      0)
        .thenAwait(Duration.ofMillis(100)) //small hiccup to cancel after the prefetch
        .thenCancel()
        .verify();
  assertThat(evicted).containsExactly(1, 2, 3, 4, 5);
}

代码示例来源:origin: reactor/reactor-core

public Flux<Integer> commitDelay() {
  this.commitProbe = PublisherProbe.of(
      Flux.just(3, 2, 1)
        .delayElements(DELAY)
        .log("commit method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
  return commitProbe.flux();
}

代码示例来源:origin: reactor/reactor-core

public Flux<Integer> rollbackDelay() {
  this.rollbackProbe = PublisherProbe.of(
      Flux.just(5, 4, 3, 2, 1)
        .delayElements(DELAY)
        .log("rollback method used", level, SignalType.ON_NEXT, SignalType.ON_COMPLETE));
  return rollbackProbe.flux();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleZipTest() throws Exception {
  int elements = 69;
  CountDownLatch latch = new CountDownLatch((elements / 2) + 1);
  Publisher<SensorData> p = Flux.zip(sensorEven(), sensorOdd(), this::computeMin)
                 .log("zip");
  generateData(elements);
  awaitLatch(p, latch);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleMergeTest() throws Exception {
  int elements = 40;
  CountDownLatch latch = new CountDownLatch(elements + 1);
  Publisher<SensorData> p = Flux.merge(sensorOdd(), sensorEven())
                 .log("merge");
  generateData(elements);
  awaitLatch(p, latch);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleZipTest2() throws Exception {
  int elements = 1;
  CountDownLatch latch = new CountDownLatch(elements + 1);
  Publisher<SensorData> p = Flux.zip(sensorEven(), Flux.just(new SensorData(1L, 14.0f)), this::computeMin)
                 .log("zip2");
  generateData(elements);
  awaitLatch(p, latch);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void expectNextCountAfterThenConsumeWhile() {
  StepVerifier.create(Flux.range(1, 5).log())
        .thenConsumeWhile(i -> i <= 2)
        .expectNextCount(3)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void innerCancellationCancelsMainSequence() {
  StepVerifier.create(Flux.just("red", "green", "#", "black", "white")
              .log()
              .windowWhile(s -> !s.equals("#"))
              .flatMap(w -> w.take(1)))
        .expectNext("red")
        .thenCancel()
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testGreen() {
  FluxProcessor<String, String> processor = EmitterProcessor.create();
  AssertSubscriber<String> subscriber = AssertSubscriber.create(1);
  processor.subscribe(subscriber);
  Flux.fromIterable(DATA)
    .log()
    .subscribe(processor);
  subscriber.awaitAndAssertNextValues("1");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void mergeWithNoInterleave() throws Exception{
  Flux.concat(emitter1.log("test1"), emitter2.log("test2")).log().subscribe(ts);
  emitValues();
  ts.assertValues(1L, 3L, 2L, 4L).assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void secondWinner() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.first(Flux.never(),
      Flux.range(11, 10)
        .log())
    .subscribe(ts);
  ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verifyVirtualTimeOnComplete() {
  StepVerifier.withVirtualTime(() -> Flux.empty()
                      .delaySubscription(Duration.ofHours(1))
                      .log())
        .thenAwait(Duration.ofHours(1))
        .expectComplete()
        .verify();
}

相关文章

Flux类方法