reactor.core.publisher.Flux.checkpoint()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(385)

本文整理了Java中reactor.core.publisher.Flux.checkpoint()方法的一些代码示例,展示了Flux.checkpoint()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.checkpoint()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:checkpoint

Flux.checkpoint介绍

[英]Activate assembly tracing for this particular Flux, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.

It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
[中]在检查点上游出现错误的情况下,激活此特定流量的程序集跟踪。跟踪会产生异常堆栈跟踪创建的成本。
应将其放置在反应链的末端,因为无法观察到在其下游触发的错误,并且无法通过装配跟踪来增加错误。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Activate assembly tracing for this particular {@link Flux}, in case of an error
 * upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
 * creation.
 * <p>
 * It should be placed towards the end of the reactive chain, as errors
 * triggered downstream of it cannot be observed and augmented with assembly trace.
 *
 * @return the assembly tracing {@link Flux}.
 */
public final Flux<T> checkpoint() {
  return checkpoint(null, true);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Activate assembly marker for this particular {@link Flux} by giving it a description that
 * will be reflected in the assembly traceback in case of an error upstream of the
 * checkpoint. Note that unlike {@link #checkpoint()}, this doesn't create a
 * filled stack trace, avoiding the main cost of the operator.
 * However, as a trade-off the description must be unique enough for the user to find
 * out where this Flux was assembled. If you only want a generic description, and
 * still rely on the stack trace to find the assembly site, use the
 * {@link #checkpoint(String, boolean)} variant.
 * <p>
 * It should be placed towards the end of the reactive chain, as errors
 * triggered downstream of it cannot be observed and augmented with assembly trace.
 *
 * @param description a unique enough description to include in the light assembly traceback.
 * @return the assembly marked {@link Flux}
 */
public final Flux<T> checkpoint(String description) {
  return checkpoint(Objects.requireNonNull(description), false);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void operatorChainWithCheckpoint() {
  Flux<String> flux  = Flux.just("foo")
               .checkpoint("checkpointHere", true)
               .map(a -> a);
  assertThat(Scannable.from(flux).steps())
      .containsExactly(
          "source(FluxJust)",
          "Flux.checkpoint ⇢ reactor.core.ScannableTest.operatorChainWithCheckpoint(ScannableTest.java:614)",
          "map"
      );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void operatorChainWithLightCheckpoint() {
  Flux<String> flux  = Flux.just("foo")
               .checkpoint("checkpointHere")
               .map(a -> a);
  assertThat(Scannable.from(flux).steps())
      .containsExactly(
          "source(FluxJust)",
          "checkpoint(\"checkpointHere\")",
          "map"
      );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void checkpointEmpty() {
  StringWriter sw = new StringWriter();
  Flux<Integer> tested = Flux.range(1, 10)
                .map(i -> i < 3 ? i : null)
                .filter(i -> i % 2 == 0)
                .checkpoint()
                .doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
  StepVerifier.create(tested)
        .expectNext(2)
        .verifyError();
  String debugStack = sw.toString();
  assertThat(debugStack).contains("Assembly trace from producer [reactor.core.publisher.FluxFilterFuseable] :");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void checkpointWithDescriptionIsLight() {
  StringWriter sw = new StringWriter();
  Flux<Integer> tested = Flux.range(1, 10)
                .map(i -> i < 3 ? i : null)
                .filter(i -> i % 2 == 0)
                .checkpoint("foo")
                .doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
  StepVerifier.create(tested)
        .expectNext(2)
        .verifyError();
  String debugStack = sw.toString();
  assertThat(debugStack).contains("Assembly site of producer [reactor.core.publisher.FluxFilterFuseable] is identified by light checkpoint [foo].");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void checkpointDescriptionAndForceStack() {
  StringWriter sw = new StringWriter();
  Flux<Integer> tested = Flux.range(1, 10)
                .map(i -> i < 3 ? i : null)
                .filter(i -> i % 2 == 0)
                .checkpoint("foo", true)
                .doOnError(t -> t.printStackTrace(new PrintWriter(sw)));
  StepVerifier.create(tested)
        .expectNext(2)
        .verifyError();
  String debugStack = sw.toString();
  assertThat(debugStack).contains("Assembly trace from producer [reactor.core.publisher.FluxFilterFuseable], described as [foo] :");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void checkpointEmptyAndDebug() {
  StringWriter sw = new StringWriter();
  Hooks.onOperatorDebug();
  try {
    Flux<Integer> tested = Flux.range(1, 10)
                  .map(i -> i < 3 ? i : null)
                  .filter(i -> i % 2 == 0)
                  .checkpoint()
                  .doOnError(t -> t.printStackTrace(new PrintWriter(
                      sw)));
    StepVerifier.create(tested)
          .expectNext(2)
          .verifyError();
    String debugStack = sw.toString();
    assertThat(debugStack).contains(
        "Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :");
  }
  finally {
    Hooks.resetOnOperatorDebug();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onAssemblyDescription() {
  String fluxOnAssemblyStr = Flux.just(1).checkpoint("onAssemblyDescription").toString();
  String expectedDescription = "checkpoint(\"onAssemblyDescription\")";
  assertTrue("Description not included: " + fluxOnAssemblyStr, fluxOnAssemblyStr.contains(expectedDescription));
  String parallelFluxOnAssemblyStr = Flux.range(1, 10).parallel(2).checkpoint("onAssemblyDescription").toString();
  assertTrue("Description not included: " + parallelFluxOnAssemblyStr, parallelFluxOnAssemblyStr.contains(expectedDescription));
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Activate assembly tracing for this particular {@link Flux}, in case of an error
 * upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
 * creation.
 * <p>
 * It should be placed towards the end of the reactive chain, as errors
 * triggered downstream of it cannot be observed and augmented with assembly trace.
 *
 * @return the assembly tracing {@link Flux}.
 */
public final Flux<T> checkpoint() {
  return checkpoint(null, true);
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<ServiceBroker> list() {
  return this.cloudFoundryClient
    .flatMapMany(DefaultServiceAdmin::requestListServiceBrokers)
    .map(this::toServiceBroker)
    .transform(OperationsLogging.log("List Service Brokers"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<OrganizationQuota> listQuotas() {
  return this.cloudFoundryClient
    .flatMapMany(DefaultOrganizationAdmin::requestListOrganizationQuotas)
    .map(DefaultOrganizationAdmin::toOrganizationQuota)
    .transform(OperationsLogging.log("List Organization Quotas"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<Buildpack> list() {
  return this.cloudFoundryClient
    .flatMapMany(DefaultBuildpacks::requestBuildpacks)
    .map(DefaultBuildpacks::toBuildpackResource)
    .transform(OperationsLogging.log("List Buildpacks"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<ApplicationSummary> list() {
  return Mono
    .zip(this.cloudFoundryClient, this.spaceId)
    .flatMap(function(DefaultApplications::requestSpaceSummary))
    .flatMapMany(DefaultApplications::extractApplications)
    .map(DefaultApplications::toApplicationSummary)
    .transform(OperationsLogging.log("List Applications"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<RouterGroup> listRouterGroups() {
  return this.routingClient
    .flatMapMany(DefaultDomains::requestListRouterGroups)
    .flatMapIterable(ListRouterGroupsResponse::getRouterGroups)
    .map(DefaultDomains::toRouterGroup)
    .transform(OperationsLogging.log("List Router Groups"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<SpaceQuota> listQuotas() {
  return Mono
    .zip(this.cloudFoundryClient, this.organizationId)
    .flatMapMany(function(DefaultSpaceAdmin::requestSpaceQuotaDefinitions))
    .map(DefaultSpaceAdmin::toSpaceQuota)
    .transform(OperationsLogging.log("List Space Quota"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<SpaceSummary> list() {
  return Mono
    .zip(this.cloudFoundryClient, this.organizationId)
    .flatMapMany(function(DefaultSpaces::requestSpaces))
    .map(DefaultSpaces::toSpaceSummary)
    .transform(OperationsLogging.log("List Spaces"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<Task> listTasks(ListApplicationTasksRequest request) {
  return Mono
    .zip(this.cloudFoundryClient, this.spaceId)
    .flatMap(function((cloudFoundryClient, spaceId) -> Mono.zip(
      Mono.just(cloudFoundryClient),
      getApplicationIdV3(cloudFoundryClient, request.getName(), spaceId))
    ))
    .flatMapMany(function((cloudFoundryClient, applicationId) -> requestListTasks(cloudFoundryClient, applicationId)))
    .map(DefaultApplications::toTask)
    .transform(OperationsLogging.log("List Application Tasks"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<Void> remove(RemoveNetworkPolicyRequest request) {
  return Mono
    .zip(this.cloudFoundryClient, this.networkingClient, this.spaceId)
    .flatMapMany(function((cloudFoundryClient, networkingClient, spaceId) -> Mono.zip(
      Mono.just(networkingClient),
      getApplicationsByName(cloudFoundryClient, spaceId)
    )))
    .flatMap(function((networkingClient, applications) -> requestRemovePolicy(networkingClient, applications, request)))
    .transform(OperationsLogging.log("Remove Network Policy"))
    .checkpoint();
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

@Override
public Flux<Policy> list(ListNetworkPoliciesRequest request) {
  return Mono
    .zip(this.cloudFoundryClient, this.networkingClient, this.spaceId)
    .flatMapMany(function((cloudFoundryClient, networkingClient, spaceId) -> Mono.zip(
      getApplicationsById(cloudFoundryClient, spaceId),
      getPolicies(networkingClient)
    )))
    .flatMap(function((applications, policies) -> toPolicy(applications, policies, request)))
    .transform(OperationsLogging.log("List Network Policies"))
    .checkpoint();
}

相关文章

Flux类方法