reactor.core.publisher.Flux.doOnComplete()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(906)

本文整理了Java中reactor.core.publisher.Flux.doOnComplete()方法的一些代码示例,展示了Flux.doOnComplete()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnComplete()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnComplete

Flux.doOnComplete介绍

[英]Add behavior (side-effect) triggered when the Flux completes successfully.
[中]添加流量成功完成时触发的行为(副作用)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
    @Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
  if (publisher != null && publisherNested != null) {
    throw new IllegalArgumentException("At most one publisher expected");
  }
  this.publisher = publisher != null ?
      Flux.from(publisher)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .doOnNext(this.buffer::write)
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  this.publisherNested = publisherNested != null ?
      Flux.from(publisherNested)
          .doOnSubscribe(s -> this.hasContentConsumer = true)
          .map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
          .doOnError(this::handleOnError)
          .doOnCancel(this::handleOnComplete)
          .doOnComplete(this::handleOnComplete) : null;
  if (publisher == null && publisherNested == null) {
    this.content.onComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

private void subscribe(Flux<Flux<Integer>> windows) {
  mainSubscriber = AssertSubscriber.create();
  windows.doOnCancel(() -> mainCancelled.incrementAndGet())
      .doOnComplete(() -> mainCompleted.incrementAndGet())
      .doOnTerminate(() -> mainTerminated.incrementAndGet()).subscribe(mainSubscriber);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void streamStateRelatedSignalsCanBeConsumed() {
//        "Stream "state" related signals can be consumed"
//        given: "a composable with values 1 to 5 inclusive"
    Flux<Integer> stream = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5));
    List<Integer> values = new ArrayList<>();
    List<String> signals = new ArrayList<>();

//        when: "a Subscribe Consumer is registered"
    stream = stream.doOnSubscribe(s -> signals.add("subscribe"));

//        and: "a Cancel Consumer is registered"
    stream = stream.doOnCancel(() -> signals.add("cancel"));

//        and: "a Complete Consumer is registered"
    stream = stream.doOnComplete(() -> signals.add("complete"));

//        and: "the flux is consumed"
    stream.subscribe(values::add);

//        then: "the initial values are passed"
    assertThat(values).containsExactly(1, 2, 3, 4, 5);
    assertThat(signals).containsExactly("subscribe", "complete");
  }

代码示例来源:origin: reactor/reactor-core

@Test
  public void streamCanEmitDefaultValueIfEmpty() {
//        "Stream can emit a default value if empty"
//        given: "a composable that only completes"
    Flux<String> stream = Flux.empty();
    List<String> values = new ArrayList<>();

//        when: "a Subscribe Consumer is registered"
    stream = stream.defaultIfEmpty("test")
            .doOnComplete(() -> values.add("complete"));

//        and: "the flux is consumed"
    stream.subscribe(values::add);

//        then: "the initial values are passed"
    assertThat(values).containsExactly("test", "complete");
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void syncPollCompleteCalled() {
  AtomicBoolean onComplete = new AtomicBoolean();
  ConnectableFlux<Integer> f = Flux.just(1)
                  .doOnComplete(() -> onComplete.set(true))
                  .publish();
  StepVerifier.create(f)
        .then(f::connect)
        .expectNext(1)
        .verifyComplete();
  assertThat(onComplete.get()).withFailMessage("onComplete not called back").isTrue();
}

代码示例来源:origin: reactor/reactor-core

private void expectWindow(int index, Predicate<? super Integer> innerCancelPredicate, List<Integer> values) {
  AssertSubscriber<Integer> s = AssertSubscriber.create();
  mainSubscriber.values().get(index)
      .doOnCancel(() -> innerCancelled.incrementAndGet())
      .doOnComplete(() -> {
        innerCompleted.incrementAndGet();})
      .doOnTerminate(() -> innerTerminated.incrementAndGet())
      .takeWhile(innerCancelPredicate).subscribe(s);
  s.assertValueSequence(values).assertNoError();
  innerCreated.incrementAndGet();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncPollConditionalCompleteCalled() {
  AtomicBoolean onComplete = new AtomicBoolean();
  ConnectableFlux<Integer> f = Flux.just(1)
                  .doOnComplete(() -> onComplete.set(true))
                  .filter(v -> true)
                  .publish();
  StepVerifier.create(f)
        .then(f::connect)
        .expectNext(1)
        .verifyComplete();
  assertThat(onComplete.get()).withFailMessage("onComplete not called back").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noFusionCompleteCalled() {
  AtomicBoolean onComplete = new AtomicBoolean();
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .doOnComplete(() -> onComplete.set(true))
    .subscribe(ts);
  ts.assertNoError()
   .assertValues(1, 2)
   .assertComplete();
  Assert.assertTrue("onComplete not called back", onComplete.get());
}

代码示例来源:origin: line/armeria

@Override
  public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
    response.setStatusCode(HttpStatus.OK);
    return request.getBody().map(data -> {
      // skip data, then throw an exception.
      throw HttpStatusException.of(com.linecorp.armeria.common.HttpStatus.BAD_REQUEST);
    }).doOnComplete(() -> {
      // An HTTP GET request doesn't have a body, so onComplete will be immediately called.
      throw HttpStatusException.of(com.linecorp.armeria.common.HttpStatus.BAD_REQUEST);
    }).then();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncCompleteCalled() {
  AtomicBoolean onComplete = new AtomicBoolean();
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .hide()
    .doOnComplete(() -> onComplete.set(true))
    .subscribe(ts);
  ts.assertNoError()
   .assertValues(1, 2)
   .assertComplete();
  Assert.assertTrue("onComplete not called back", onComplete.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void deferredFluxInitialValueLaterAvailableUpToLongMax() throws InterruptedException {
//        "A deferred Flux with an initial value makes that value available later up to Long.MAX "
//        given: "a composable with an initial value"
    AtomicReference<Throwable> e = new AtomicReference<>();
    CountDownLatch latch = new CountDownLatch(1);
    Flux<Integer> stream = Flux.fromIterable(Arrays.asList(1, 2, 3))
                  .publish()
                  .autoConnect()
                  .doOnError(e::set)
                  .doOnComplete(latch::countDown);

//        when: "cumulated request of Long MAX"
    long test = Long.MAX_VALUE / 2L;
    AssertSubscriber<Integer> controls =
        stream.subscribeWith(AssertSubscriber.create(0));
    controls.request(test);
    controls.request(test);
    controls.request(1);

    //sleep(2000)

//        then: "no error available"
    latch.await(2, TimeUnit.SECONDS);

    assertThat(e.get()).isNull();
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void flatMapSignal2() {
  StepVerifier.create(Mono.just(1)
              .flatMapMany(d -> Flux.just(d * 2),
                  e -> Flux.just(99),
                  () -> Flux.just(10)).doOnComplete(() -> {
    System.out.println("test");
      })
  .log())
        .expectNext(2, 10)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

.doOnComplete(() -> {
  if (ref.get() != null) {
    Scannable t = ref.get();

代码示例来源:origin: reactor/reactor-core

.doOnNext(i -> processor.onNext(retainedDetector.tracked(new Wrapper(i))))
     .doOnError(processor::onError)
     .doOnComplete(processor::onComplete);
    retainedDetector.finalizedCount()))
.doOnNext(v -> LOGGER.debug(v.toString()))
.doOnComplete(latch::countDown)
.collectList();

代码示例来源:origin: reactor/reactor-core

.delayElements(Duration.ofMillis(10))
     .doOnNext(i -> processor.onNext(finalizedTracker.tracked(new Wrapper(i))))
     .doOnComplete(processor::onComplete);
.doOnComplete(latch::countDown)
.collectList();

代码示例来源:origin: reactor/reactor-core

@Test
public void completeCallbackError() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Throwable err = new Exception("test");
  Flux.just(1)
    .doOnComplete(() -> {
      throw Exceptions.propagate(err);
    })
    .subscribe(ts);
  //nominal error path (DownstreamException)
  ts.assertErrorMessage("test");
  ts = AssertSubscriber.create();
  try {
    Flux.just(1)
      .doOnComplete(() -> {
        throw Exceptions.bubble(err);
      })
      .subscribe(ts);
    Assert.fail();
  }
  catch (Exception e) {
    Assert.assertTrue(Exceptions.unwrap(e) == err);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeCallbackError() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Throwable err = new Exception("test");
  Flux.just(1)
    .hide()
    .doOnComplete(() -> {
      throw Exceptions.propagate(err);
    })
    .subscribe(ts);
  //nominal error path (DownstreamException)
  ts.assertErrorMessage("test");
  ts = AssertSubscriber.create();
  try {
    Flux.just(1)
      .hide()
      .doOnComplete(() -> {
        throw Exceptions.bubble(err);
      })
      .subscribe(ts);
    fail();
  }
  catch (Exception e) {
    Assert.assertTrue(Exceptions.unwrap(e) == err);
  }
}

代码示例来源:origin: reactor/reactor-core

})).producerNever(),
scenario(f -> f.doOnComplete(() -> {
  throw exception();
}))
}, null, null)).producerEmpty(),
scenario(f -> f.doOnComplete(() -> {
          throw exception();
        })).producerEmpty(),

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(scenario(f -> f.doOnSubscribe(s -> {
      })),
      scenario(f -> f.doOnError(s -> {
      })),
      scenario(f -> f.doOnTerminate(() -> {
      })),
      scenario(f -> f.doAfterTerminate(() -> {
      })),
      scenario(f -> f.doOnCancel(() -> {
      })),
      scenario(f -> f.doOnComplete(() -> {
      })),
      scenario(f -> f.doOnRequest(d -> {
      })),
      scenario(f -> f.doOnRequest(s -> {
        throw new RuntimeException(); //ignored
      })),
      scenario(f -> f.doOnNext(s -> {
      })),
      scenario(f -> f.doOnError(s -> {
      })));
}

代码示例来源:origin: rsocket/rsocket-java

@Override
public Flux<Payload> requestStream(Payload payload) {
 return source
   .requestStream(payload)
   .doOnError(th -> errorPercentage.insert(0.0))
   .doOnComplete(() -> updateErrorPercentage(1.0));
}

相关文章

Flux类方法