reactor.core.publisher.Flux.doOnSubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(362)

本文整理了Java中reactor.core.publisher.Flux.doOnSubscribe()方法的一些代码示例,展示了Flux.doOnSubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnSubscribe()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnSubscribe

Flux.doOnSubscribe介绍

[英]Add behavior (side-effect) triggered when the Flux is subscribed.

This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call Disposable#dispose() on the Disposable returned by Flux#subscribe().
[中]订阅流量时触发的添加行为(副作用)。
此方法不用于捕获订阅并调用其方法,而是用于监视等副作用。例如,取消订阅的正确方法是对Flux#subscribe()返回的一次性文件调用Disposable#dispose()。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
protected Flux<O> doOnSubscribe(Flux<O> output,
    Consumer<? super Subscription> doOnSubscribe) {
  return output.doOnSubscribe(doOnSubscribe);
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  subscription = Flux.from(publisher)
            .log(log.getName(), Level.FINEST)
            .doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
            .ofType(eventType)
            .cast(eventType)
            .compose(this::handle)
            .onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
            .subscribe();
}

代码示例来源:origin: spring-projects/spring-framework

public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
    @Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
  if (publisher != null && publisherNested != null) {
    throw new IllegalArgumentException("At most one publisher expected");
  }
  this.publisher = publisher != null ?
      Flux.from(publisher)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .doOnNext(this.buffer::write)
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  this.publisherNested = publisherNested != null ?
      Flux.from(publisherNested)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  if (publisher == null && publisherNested == null) {
    this.content.onComplete();
  }
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public void start() {
  super.start();
  intervalSubscription = Flux.interval(updateInterval)
                .doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
                .log(log.getName(), Level.FINEST)
                .subscribeOn(Schedulers.newSingle("status-monitor"))
                .concatMap(i -> this.updateStatusForAllInstances())
                .onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
                  ex
                ))
                .subscribe();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void directOtherErrorPreventsSubscribe() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  Flux<Integer> source = justError
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> retry = source.retryWhen(other -> Mono.error(new IllegalStateException("boom")));
  StepVerifier.create(retry)
        .expectSubscription()
        .verifyErrorMessage("boom");
  assertThat(sourceSubscribed.get()).isFalse();
  assertThat(sourceCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void directOtherEmptyPreventsSubscribeAndCompletes() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  Flux<Integer> source = justError
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> retry = source.retryWhen(other -> Flux.empty());
  StepVerifier.create(retry)
        .expectSubscription()
        .verifyComplete();
  assertThat(sourceSubscribed.get()).isFalse();
  assertThat(sourceCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void directOtherEmptyPreventsSubscribe() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  Flux<Integer> source = Flux.just(1)
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> repeat = source.repeatWhen(other -> Flux.empty());
  StepVerifier.create(repeat)
        .expectSubscription()
        .verifyComplete();
  assertThat(sourceSubscribed.get()).isFalse();
  assertThat(sourceCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void directOtherErrorPreventsSubscribe() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  Flux<Integer> source = Flux.just(1)
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> repeat = source.repeatWhen(other -> Mono.error(new IllegalStateException("boom")));
  StepVerifier.create(repeat)
        .expectSubscription()
        .verifyErrorMessage("boom");
  assertThat(sourceSubscribed.get()).isFalse();
  assertThat(sourceCancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Override
public Flux<T> flux() {
  return Flux.from(delegate)
        .doOnSubscribe(sub -> incrementAndGet(SUBSCRIBED))
        .doOnCancel(() -> incrementAndGet(CANCELLED))
        .doOnRequest(l -> incrementAndGet(REQUESTED));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedConnectable() throws InterruptedException {
  Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));
  ConnectableFlux<Integer> co = source.publish();
  co.subscribe(System.out::println, e -> {}, () -> {});
  co.subscribe(System.out::println, e -> {}, () -> {});
  System.out.println("done subscribing");
  Thread.sleep(500);
  System.out.println("will now connect");
  co.connect();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedConnectableAutoConnect() throws InterruptedException {
  Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));
  Flux<Integer> autoCo = source.publish().autoConnect(2);
  autoCo.subscribe(System.out::println, e -> {}, () -> {});
  System.out.println("subscribed first");
  Thread.sleep(500);
  System.out.println("subscribing second");
  autoCo.subscribe(System.out::println, e -> {}, () -> {});
}

代码示例来源:origin: reactor/reactor-core

@Test
public void lateOtherErrorCancelsSource() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  AtomicInteger count = new AtomicInteger();
  Flux<Integer> source = justError
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> retry = source.retryWhen(other -> other.flatMap(l ->
      count.getAndIncrement() == 0 ? Mono.just(l) : Mono.<Long>error(new IllegalStateException("boom"))));
  StepVerifier.create(retry)
        .expectSubscription()
        .expectNext(1)
        .expectNext(1)
        .verifyErrorMessage("boom");
  assertThat(sourceSubscribed.get()).isTrue();
  assertThat(sourceCancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void lateOtherEmptyCancelsSourceAndCompletes() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  Flux<Integer> source = justError
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> retry = source.retryWhen(other -> other.take(1));
  StepVerifier.create(retry)
        .expectSubscription()
        .expectNext(1) //original
        .expectNext(1) //retry
        .verifyComplete(); //retry terminated
  assertThat(sourceSubscribed.get()).isTrue();
  assertThat(sourceCancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void lateOtherEmptyCancelsSource() {
  AtomicBoolean sourceSubscribed = new AtomicBoolean();
  AtomicBoolean sourceCancelled = new AtomicBoolean();
  Flux<Integer> source = Flux.just(1)
                .doOnSubscribe(sub -> sourceSubscribed.set(true))
                .doOnCancel(() -> sourceCancelled.set(true));
  Flux<Integer> repeat = source.repeatWhen(other -> other.take(1));
  StepVerifier.create(repeat)
        .expectSubscription()
        .expectNext(1) //original
        .expectNext(1) //repeat
        .verifyComplete(); //repeat terminated
  assertThat(sourceSubscribed.get()).isTrue();
  assertThat(sourceCancelled.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void introspectionCancel() {
  AtomicReference<Scannable> scannable = new AtomicReference<>();
  Flux<Integer> flux = Flux.range(1, 10).concatWith(Mono.error(new IllegalStateException("boom")))
               .filterWhen(i -> Mono.just(i % 2 == 0), 3)
               .doOnSubscribe(sub -> {
                 assertThat(sub).isInstanceOf(Scannable.class);
                 scannable.set((Scannable) sub);
               });
  StepVerifier.create(flux, 0)
        .thenRequest(1)
        .expectNext(2)
        .then(() -> assertThat(scannable.get().scan(Scannable.Attr.CANCELLED)).isEqualTo(false))
        .thenCancel()
        .verify();
  assertThat(scannable.get().scan(Scannable.Attr.CANCELLED)).isEqualTo(true);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void evictCancels() {
  AtomicReference<Subscription> subscription = new AtomicReference<>();
  TestPublisher<Integer> tp = TestPublisher.create();
  StepVerifier.withVirtualTime(() -> tp.flux()
                     .doOnSubscribe(subscription::set)
                     .onBackpressureBuffer(Duration.ofSeconds(1), 10, i -> {
                       evicted.add(i);
                       subscription.get().cancel();
                     }, VirtualTimeScheduler.get()),
      0)
        .then(() -> tp.emit(1, 2, 3, 4, 5))
        .thenAwait(Duration.ofMinutes(1))
        .verifyComplete();
  tp.assertCancelled();
  assertThat(evicted).containsExactly(1, 2, 3, 4, 5);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subscribeInnerOnce() {
  LongAdder innerSub1 = new LongAdder();
  LongAdder innerSub2 = new LongAdder();
  Flux<Integer> source1 = Flux.range(1, 5)
                .doOnSubscribe((t) -> innerSub1.increment());
  Flux<Integer> source2 = Flux.just(1, 2, 3, 7, 8)
                .doOnSubscribe((t) -> innerSub2.increment());
  Mono.sequenceEqual(source1, source2)
    .subscribe();
  Assert.assertEquals("left has been subscribed multiple times", 1, innerSub1.intValue());
  Assert.assertEquals("right has been subscribed multiple times", 1, innerSub2.intValue());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sourceSubscribedOnce() {
  AtomicInteger subCount = new AtomicInteger();
  Flux<Integer> source = Flux.range(1, 10)
                .hide()
                .doOnSubscribe(subscription -> subCount.incrementAndGet());
  StepVerifier.create(source.switchOnFirst((s, f) -> f.filter(v -> v % 2 == s.get())))
        .expectNext(1, 3, 5, 7, 9)
        .verifyComplete();
  Assertions.assertThat(subCount).hasValue(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeFusedBackpressuredCancelled() {
  UnicastProcessor<String> up = UnicastProcessor.create();
  StepVerifier.create(up.take(3).doOnSubscribe(s -> {
    assertThat(((Fuseable.QueueSubscription)s).size()).isEqualTo(0);
  }), 0)
        .expectFusion()
        .then(() -> up.onNext("test"))
        .then(() -> up.onNext("test"))
        .then(() -> up.onNext("test"))
        .thenRequest(2)
        .expectNext("test", "test")
        .thenCancel()
        .verify();
}

相关文章

Flux类方法