reactor.core.publisher.Flux.usingWhen()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(14.3k)|赞(0)|评价(0)|浏览(935)

本文整理了Java中reactor.core.publisher.Flux.usingWhen()方法的一些代码示例,展示了Flux.usingWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.usingWhen()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:usingWhen

Flux.usingWhen介绍

[英]Uses a resource, generated by a Publisher for each individual Subscriber, while streaming the values from a Publisher derived from the same resource. Whenever the resulting sequence terminates, a provided Function generates a "cleanup" Publisher that is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).

Note that if the resource supplying Publisher emits more than one resource, the subsequent resources are dropped ( Operators#onNextDropped(Object,Context)). If the publisher errors AFTER having emitted one resource, the error is also silently dropped ( Operators#onErrorDropped(Throwable,Context)). An empty completion or error without at least one onNext signal triggers a short-circuit of the main sequence with the same terminal signal (no resource is established, no cleanup is invoked).
[中]使用发布者为每个单独订阅者生成的资源,同时流式传输来自同一资源的发布者的值。无论结果序列何时终止,提供的函数都会生成一个“cleanup”发布服务器,该发布服务器将被调用,但不会更改主序列的内容。相反,它只是延迟终止(除非它出错,在这种情况下,错误会抑制原始终止信号)。
请注意,如果提供资源的发布服务器发出多个资源,则会删除后续资源(操作符#onNextDropped(对象、上下文))。如果发布者在发出一个资源后出错,则错误也会被静默删除(操作符#onerrordroped(Throwable,Context))。没有至少一个onNext信号的空完成或错误会触发具有相同终端信号的主序列短路(未建立资源,未调用清理)。

代码示例

代码示例来源:origin: spring-projects/spring-data-mongodb

@Override
  public <T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally) {
    return cachedSession.flatMapMany(session -> {
      if (!session.hasActiveTransaction()) {
        session.startTransaction();
      }
      return Flux.usingWhen(Mono.just(session), //
          s -> ReactiveMongoTemplate.this.withSession(action, s), //
          ClientSession::commitTransaction, //
          ClientSession::abortTransaction) //
          .doFinally(signalType -> doFinally.accept(session));
    });
  }
};

代码示例来源:origin: reactor/reactor-core

@Test
public void nullResourcePublisherRejected() {
  assertThatNullPointerException()
      .isThrownBy(() -> Flux.usingWhen(null,
          tr -> Mono.empty(),
          tr -> Mono.empty(),
          tr -> Mono.empty()))
      .withMessage("resourceSupplier")
      .withNoCause();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyResourceCallableDoesntApplyCallback() {
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Flux<String> test = Flux.usingWhen(Flux.empty(),
      tr -> Mono.just("unexpected"),
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .verifyComplete();
  assertThat(commitDone).isFalse();
  assertThat(rollbackDone).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void blockOnNeverResourceCanBeCancelled() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  Disposable disposable = Flux.usingWhen(Flux.<String>never(),
      Flux::just,
      Flux::just,
      Flux::just,
      Flux::just)
                .doFinally(f -> latch.countDown())
                .subscribe();
  assertThat(latch.await(500, TimeUnit.MILLISECONDS))
      .as("hangs before dispose").isFalse();
  disposable.dispose();
  assertThat(latch.await(100, TimeUnit.MILLISECONDS))
      .as("terminates after dispose").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failToGenerateClosureAppliesRollback() {
  TestResource testResource = new TestResource();
  Flux<String> test = Flux.usingWhen(Mono.just(testResource),
      tr -> {
        throw new UnsupportedOperationException("boom");
      },
      TestResource::commit,
      TestResource::rollback);
  StepVerifier.create(test)
        .verifyErrorSatisfies(e -> assertThat(e).hasMessage("boom"));
  testResource.commitProbe.assertWasNotSubscribed();
  testResource.rollbackProbe.assertWasSubscribed();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void nullClosureAppliesRollback() {
  TestResource testResource = new TestResource();
  Flux<String> test = Flux.usingWhen(Mono.just(testResource),
      tr -> null,
      TestResource::commit,
      TestResource::rollback);
  StepVerifier.create(test)
        .verifyErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(NullPointerException.class)
            .hasMessage("The resourceClosure function returned a null value"));
  testResource.commitProbe.assertWasNotSubscribed();
  testResource.rollbackProbe.assertWasSubscribed();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyResourcePublisherDoesntApplyCallback() {
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Flux<String> test = Flux.usingWhen(Flux.empty().hide(),
      tr -> Mono.just("unexpected"),
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .verifyComplete();
  assertThat(commitDone).isFalse();
  assertThat(rollbackDone).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorResourceCallableDoesntApplyCallback() {
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Flux<String> test = Flux.usingWhen(Flux.error(new IllegalStateException("boom")),
      tr -> Mono.just("unexpected"),
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .verifyErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessage("boom")
            .hasNoCause()
            .hasNoSuppressedExceptions()
        );
  assertThat(commitDone).isFalse();
  assertThat(rollbackDone).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorResourcePublisherDoesntApplyCallback() {
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Flux<String> test = Flux.usingWhen(Flux.error(new IllegalStateException("boom")).hide(),
      tr -> Mono.just("unexpected"),
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .verifyErrorSatisfies(e -> assertThat(e)
            .isInstanceOf(IllegalStateException.class)
            .hasMessage("boom")
            .hasNoCause()
            .hasNoSuppressedExceptions()
        );
  assertThat(commitDone).isFalse();
  assertThat(rollbackDone).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoResourcePublisherIsNotCancelled() {
  AtomicBoolean cancelled = new AtomicBoolean();
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Mono<String> resourcePublisher = Mono.just("Resource")
                     .doOnCancel(() -> cancelled.set(true));
  Flux<String> test = Flux.usingWhen(resourcePublisher,
      Flux::just,
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .expectNext("Resource")
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedErrors();
  assertThat(commitDone).isTrue();
  assertThat(rollbackDone).isFalse();
  assertThat(cancelled).as("resource publisher was not cancelled").isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sourcesTransactionError")
public void apiRollbackGeneratingNull(Flux<String> transactionWithError) {
  final AtomicReference<TestResource> ref = new AtomicReference<>();
  Flux<String> flux = Flux.usingWhen(Mono.fromCallable(TestResource::new),
      d -> {
        ref.set(d);
        return transactionWithError;
      },
      TestResource::commitError,
      TestResource::rollbackNull);
  StepVerifier.create(flux)
        .expectNext("Transaction started")
        .expectNext("work in transaction")
        .verifyErrorSatisfies(e -> assertThat(e)
            .hasMessage("The asyncError returned a null Publisher")
            .isInstanceOf(NullPointerException.class)
            .hasSuppressedException(new IllegalStateException("boom")));
  assertThat(ref.get())
      .isNotNull()
      .matches(tr -> !tr.commitProbe.wasSubscribed(), "no commit")
      .matches(tr -> !tr.rollbackProbe.wasSubscribed(), "rollback method short-circuited");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxResourcePublisherIsCancelled() {
  AtomicBoolean cancelled = new AtomicBoolean();
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  Flux<String> resourcePublisher = Flux.just("Resource", "Something Else")
                     .doOnCancel(() -> cancelled.set(true));
  Flux<String> test = Flux.usingWhen(resourcePublisher,
      Mono::just,
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .expectNext("Resource")
        .expectComplete()
        .verifyThenAssertThat()
        .hasNotDroppedErrors();
  assertThat(commitDone).isTrue();
  assertThat(rollbackDone).isFalse();
  assertThat(cancelled).as("resource publisher was cancelled").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sourcesFullTransaction")
public void commitGeneratingNull(Flux<String> fullTransaction) {
  final AtomicReference<TestResource> ref = new AtomicReference<>();
  Flux<String> flux = Flux.usingWhen(Mono.fromCallable(TestResource::new),
      d -> {
        ref.set(d);
        return fullTransaction;
      },
      TestResource::commitNull,
      TestResource::rollback);
  StepVerifier.create(flux)
        .expectNext("Transaction started")
        .expectNext("work in transaction")
        .expectNext("more work in transaction")
        .verifyErrorSatisfies(e -> assertThat(e)
            .hasMessage("The asyncComplete returned a null Publisher")
            .isInstanceOf(NullPointerException.class)
            .hasNoCause());
  assertThat(ref.get())
      .isNotNull()
      .matches(tr -> !tr.commitProbe.wasSubscribed(), "commit method short-circuited")
      .matches(tr -> !tr.rollbackProbe.wasSubscribed(), "no rollback");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorResourcePublisherAfterEmitIsDropped() {
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  TestPublisher<String> testPublisher = TestPublisher.createCold();
  testPublisher.next("Resource").error(new IllegalStateException("boom"));
  Flux<String> test = Flux.usingWhen(testPublisher,
      Mono::just,
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .expectNext("Resource")
        .expectComplete()
        .verifyThenAssertThat(Duration.ofSeconds(2))
        .hasDroppedErrorWithMessage("boom")
        .hasNotDroppedElements();
  assertThat(commitDone).isTrue();
  assertThat(rollbackDone).isFalse();
  testPublisher.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sourcesFullTransaction")
public void apiCommitFailure(Flux<String> fullTransaction) {
  final AtomicReference<TestResource> ref = new AtomicReference<>();
  Flux<String> flux = Flux.usingWhen(Mono.fromCallable(TestResource::new),
      d -> {
        ref.set(d);
        return fullTransaction;
      },
      TestResource::commitError,
      TestResource::rollback);
  StepVerifier.create(flux)
        .expectNext("Transaction started")
        .expectNext("work in transaction")
        .expectNext("more work in transaction")
        .verifyErrorSatisfies(e -> assertThat(e)
            .hasMessage("Async resource cleanup failed after onComplete")
            .hasCauseInstanceOf(ArithmeticException.class));
  assertThat(ref.get())
      .isNotNull()
      .matches(tr -> tr.commitProbe.wasSubscribed(), "commit method used")
      .matches(tr -> !tr.rollbackProbe.wasSubscribed(), "no rollback");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void secondResourceInPublisherIsDropped() {
  AtomicBoolean commitDone = new AtomicBoolean();
  AtomicBoolean rollbackDone = new AtomicBoolean();
  TestPublisher<String> testPublisher = TestPublisher.createCold();
  testPublisher.emit("Resource", "boom");
  Flux<String> test = Flux.usingWhen(testPublisher,
      Mono::just,
      tr -> Mono.fromRunnable(() -> commitDone.set(true)),
      tr -> Mono.fromRunnable(() -> rollbackDone.set(true)));
  StepVerifier.create(test)
        .expectNext("Resource")
        .expectComplete()
        .verifyThenAssertThat(Duration.ofSeconds(2))
        .hasDropped("boom")
        .hasNotDroppedErrors();
  assertThat(commitDone).isTrue();
  assertThat(rollbackDone).isFalse();
  testPublisher.assertCancelled();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void apiAsyncCleanup() {
  final AtomicReference<TestResource> ref = new AtomicReference<>();
  Flux<String> flux = Flux.usingWhen(Mono.fromCallable(TestResource::new),
      d -> {
        ref.set(d);
        return d.data().concatWithValues("work in transaction");
      },
      TestResource::commit);
  StepVerifier.create(flux)
        .expectNext("Transaction started")
        .expectNext("work in transaction")
        .verifyComplete();
  assertThat(ref.get())
      .isNotNull()
      .matches(tr -> tr.commitProbe.wasSubscribed(), "commit method used")
      .matches(tr -> !tr.rollbackProbe.wasSubscribed(), "no rollback");
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sourcesFullTransaction")
public void apiCommit(Flux<String> fullTransaction) {
  final AtomicReference<TestResource> ref = new AtomicReference<>();
  Flux<String> flux = Flux.usingWhen(Mono.fromCallable(TestResource::new),
      d -> {
        ref.set(d);
        return fullTransaction;
      },
      TestResource::commit,
      TestResource::rollback);
  StepVerifier.create(flux)
        .expectNext("Transaction started")
        .expectNext("work in transaction")
        .expectNext("more work in transaction")
        .expectComplete()
        .verify();
  assertThat(ref.get())
      .isNotNull()
      .matches(tr -> tr.commitProbe.wasSubscribed(), "commit method used")
      .matches(tr -> !tr.rollbackProbe.wasSubscribed(), "no rollback");
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sources01")
public void cancelWithHandler(Flux<String> source) {
  TestResource testResource = new TestResource();
  Flux<String> test = Flux.usingWhen(Mono.just(testResource),
      tr -> source,
      TestResource::commit,
      TestResource::rollback,
      TestResource::rollback)
              .take(2);
  StepVerifier.create(test)
        .expectNext("0", "1")
        .verifyComplete();
  testResource.commitProbe.assertWasNotSubscribed();
  testResource.rollbackProbe.assertWasSubscribed();
}

代码示例来源:origin: reactor/reactor-core

@Test
@Parameters(method = "sources01")
public void cancelWithoutHandlerAppliesCommit(Flux<String> source) {
  TestResource testResource = new TestResource();
  Flux<String> test = Flux
      .usingWhen(Mono.just(testResource).hide(),
          tr -> source,
          TestResource::commit,
          TestResource::rollback)
      .take(2);
  StepVerifier.create(test)
        .expectNext("0", "1")
        .verifyComplete();
  testResource.commitProbe.assertWasSubscribed();
  testResource.rollbackProbe.assertWasNotSubscribed();
}

相关文章

Flux类方法