reactor.core.publisher.Flux.concatMapDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(318)

本文整理了Java中reactor.core.publisher.Flux.concatMapDelayError()方法的一些代码示例,展示了Flux.concatMapDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatMapDelayError()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatMapDelayError

Flux.concatMapDelayError介绍

[英]Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.

There are three dimensions to this operator that can be compared with #flatMap(Function) and #flatMapSequential(Function):

  • Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
  • Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
  • Interleaving: this operator does not let values from different inners interleave (concatenation).

Errors in the individual publishers will be delayed at the end of the whole concat sequence (possibly getting combined into a Exceptions#isMultiple(Throwable)if several sources error.
[中]将此流量发出的元素异步转换为发布服务器,然后将这些内部发布服务器展平为单个流量,并使用串联保持顺序。
此运算符有三个维度可与#flatMap(函数)和#flatMapSequential(函数)进行比较:
*生成内部文件和订阅:此操作符在生成下一个内部文件并订阅它之前,等待一个内部文件完成。
*展平值的顺序:该操作符自然保留与源元素相同的顺序,将每个源元素的内部元素按顺序连接起来。
*交错:该运算符不允许来自不同内部的值交错(串联)。
单个发布服务器中的错误将在整个concat序列结束时延迟(如果多个源出错,可能会合并为异常#isMultiple(Throwable))。

代码示例

代码示例来源:origin: reactor/reactor-core

return concatMapDelayError(mapper, Queues.XS_BUFFER_SIZE);

代码示例来源:origin: reactor/reactor-core

@Test
public void prefetchMaxTranslatesToUnboundedRequest2() {
  AtomicLong requested = new AtomicLong();
  StepVerifier.create(Flux.just(1, 2, 3).hide()
              .doOnRequest(requested::set)
              .concatMapDelayError(i -> Flux.range(0, i), Integer.MAX_VALUE))
        .expectNext(0, 0, 1, 0, 1, 2)
        .verifyComplete();
  assertThat(requested.get())
      .isNotEqualTo(Integer.MAX_VALUE)
      .isEqualTo(Long.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void boundaryFusionDelayError() {
  Flux.range(1, 10000)
    .publishOn(Schedulers.single())
    .map(t -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + t + Thread.currentThread().getName()))
    .concatMapDelayError(Flux::just)
    .publishOn(Schedulers.elastic())
    .distinct()
    .as(StepVerifier::create)
    .expectFusion()
    .expectNext("single")
    .expectComplete()
    .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_errorFromUpstreamFailure() {
  return Arrays.asList(
      scenario(f -> f.concatMap(Flux::just)),
      scenario(f -> f.concatMap(Flux::just, 1)).prefetch(1),
      scenario(f -> f.concatMapDelayError(Flux::just))
          .shouldHitDropErrorHookAfterTerminate(true),
      scenario(f -> f.concatMapDelayError(Flux::just, true, 32))
          .shouldHitDropErrorHookAfterTerminate(true)
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void discardDelayedOnDrainMapperError() {
    StepVerifier.create(Flux.just(1, 2, 3)
                .concatMapDelayError(i -> { throw new IllegalStateException("boom"); }))
          .expectErrorMessage("boom")
          .verifyThenAssertThat()
          .hasDiscardedExactly(1);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void delayErrorConcatMapVsFlatMap() {
  Function<Integer, Flux<String>> mapFunction = i -> {
    char c = (char) ('A' + i);
    return Flux.range(1, i + 1)
          .doOnNext(v -> {
            if (i == 3 && v == 3) {
              throw new IllegalStateException("boom " + c + v);
          }
          })
      .map(v -> "" + c + "" + v);
  };
  Flux<Integer> source = Flux.range(0, 5);
  Flux<String> concatMap = source.concatMapDelayError(mapFunction)
                  .materialize()
                  .map(Object::toString);
  Flux<String> flatMap = source.flatMapDelayError(mapFunction, 2, 32)
                  .materialize()
                  .map(Object::toString);
  List<String> signalsConcat = concatMap.collectList().block();
  List<String> signalsFlat = flatMap.collectList().block();
  Assertions.assertThat(signalsConcat)
       .containsExactlyElementsOf(signalsFlat);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardDelayedOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatMapDelayError(i -> Mono.just("value" + i), 1),
      0)
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

Flux<String> concatMap = source.concatMapDelayError(mapFunction)
                .doOnError(t -> concatSuppressed.addAll(
                    Arrays.asList(t.getSuppressed())))

代码示例来源:origin: reactor/reactor-core

@Test
public void concatMapDelayErrorWithFluxError() {
  StepVerifier.create(
      Flux.just(
          Flux.just(1, 2),
          Flux.<Integer>error(new Exception("test")),
          Flux.just(3, 4))
        .concatMapDelayError(f -> f, true, 32))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void concatMapDelayErrorWithMonoError() {
  StepVerifier.create(
      Flux.just(
          Flux.just(1, 2),
          Mono.<Integer>error(new Exception("test")),
          Flux.just(3, 4))
        .concatMapDelayError(f -> f, true, 32))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEnd3() {
  StepVerifier.create(Flux.just(Flux.just(1, 2)
                   .concatWith(Flux.error(new Exception("test"))),
      Flux.just(3, 4))
              .concatMapDelayError(f -> f, true, 128))
        .expectNext(1, 2, 3, 4)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void publisherOfPublisherDelayEndNot3() {
  StepVerifier.create(Flux.just(Flux.just(1, 2)
                   .concatWith(Flux.error(new Exception("test"))),
      Flux.just(3, 4))
              .concatMapDelayError(f -> f, false, 128))
        .expectNext(1, 2)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorError() {
  return Arrays.asList(
      scenario(f -> f.concatMap(d -> {
        throw exception();
      })),
      scenario(f -> f.concatMap(d -> null))
        ,
      scenario(f -> f.concatMap(d -> {
        throw exception();
      }, 1)).prefetch(1),
      scenario(f -> f.concatMap(d -> null, 1))
          .prefetch(1)
          ,
      scenario(f -> f.concatMapDelayError(d -> {
        throw exception();
      }))
          .shouldHitDropErrorHookAfterTerminate(true),
      scenario(f -> f.concatMapDelayError(d -> null))
          .shouldHitDropErrorHookAfterTerminate(true)
          ,
      scenario(f -> f.concatMapDelayError(d -> {
        throw exception();
      }, true, 32))
          .shouldHitDropErrorHookAfterTerminate(true),
      scenario(f -> f.concatMapDelayError(d -> null, true, 32))
          .shouldHitDropErrorHookAfterTerminate(true)
  );
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(
      scenario(f -> f.concatMap(Flux::just)),
      scenario(f -> f.concatMap(d -> Flux.just(d).hide())),
      scenario(f -> f.concatMap(d -> Flux.empty()))
          .receiverEmpty(),
      scenario(f -> f.concatMapDelayError(Flux::just)),
      scenario(f -> f.concatMapDelayError(d -> Flux.just(d).hide())),
      scenario(f -> f.concatMapDelayError(d -> Flux.empty()))
          .receiverEmpty(),
      scenario(f -> f.concatMapDelayError(Flux::just, true, 32)),
      scenario(f -> f.concatMapDelayError(d -> Flux.just(d).hide(), true, 32)),
      scenario(f -> f.concatMapDelayError(d -> Flux.empty(), true, 32))
          .receiverEmpty(),
      scenario(f -> f.concatMap(Flux::just, 1)).prefetch(1),
      //scenarios with fromCallable(() -> null)
      scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null)))
          .receiverEmpty(),
      scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null), 1))
          .prefetch(1)
          .receiverEmpty(),
      scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null)))
          .shouldHitDropErrorHookAfterTerminate(true)
          .receiverEmpty(),
      scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null), true, 32))
          .shouldHitDropErrorHookAfterTerminate(true)
          .receiverEmpty()
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBoundary() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .concatMapDelayError(v -> Flux.range(v, 2))
    .subscribe(ts);
  ts.assertValues(1, 2, 2, 3)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalBoundary2() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .hide()
    .concatMapDelayError(v -> Flux.range(v, 2))
    .subscribe(ts);
  ts.assertValues(1, 2, 2, 3)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueDelayErrorsWithCallable() {
  Flux<Integer> test = Flux
      .just(1, 2)
      .hide()
      .concatMapDelayError(f -> {
        if(f == 1){
          return Mono.<Integer>error(new NullPointerException());
        }
        else {
          return Mono.just(f);
        }
      })
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(2)
      .expectComplete()
      .verifyThenAssertThat()
      .hasDropped(1)
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorModeContinueDelayErrors() {
  Flux<Integer> test = Flux
      .just(1, 2)
      .hide()
      .concatMapDelayError(f -> {
        if(f == 1){
          return Mono.<Integer>error(new NullPointerException()).hide();
        }
        else {
          return Mono.just(f);
        }
      })
      .onErrorContinue(OnNextFailureStrategyTest::drop);
  StepVerifier.create(test)
      .expectNoFusionSupport()
      .expectNext(2)
      .expectComplete()
      .verifyThenAssertThat()
      // When inner is not a Callable error value is not available.
      .hasNotDroppedElements()
      .hasDroppedErrors(1);
}

代码示例来源:origin: reactor/reactor-kafka

@Override
public KafkaOutbound<K, V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K, V>>> transactionRecords) {
  return then(Flux.from(transactionRecords)
          .publishOn(sender.senderOptions.scheduler())
          .concatMapDelayError(records -> sender.transaction(records), false, 1));
}

代码示例来源:origin: reactor/reactor-kafka

@Override
public <T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K, V, T>>> transactionRecords) {
  UnicastProcessor<Object> processor = UnicastProcessor.create();
  return Flux.from(transactionRecords)
        .publishOn(senderOptions.scheduler(), false, 1)
        .concatMapDelayError(records -> transaction(records, processor), false, 1)
        .window(processor)
        .doOnTerminate(() -> processor.onComplete())
        .doOnCancel(() -> processor.onComplete());
}

相关文章

Flux类方法