本文整理了Java中reactor.core.publisher.Flux.concatMapDelayError()
方法的一些代码示例,展示了Flux.concatMapDelayError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.concatMapDelayError()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:concatMapDelayError
[英]Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.
There are three dimensions to this operator that can be compared with #flatMap(Function) and #flatMapSequential(Function):
Errors in the individual publishers will be delayed at the end of the whole concat sequence (possibly getting combined into a Exceptions#isMultiple(Throwable)if several sources error.
[中]将此流量发出的元素异步转换为发布服务器,然后将这些内部发布服务器展平为单个流量,并使用串联保持顺序。
此运算符有三个维度可与#flatMap(函数)和#flatMapSequential(函数)进行比较:
*生成内部文件和订阅:此操作符在生成下一个内部文件并订阅它之前,等待一个内部文件完成。
*展平值的顺序:该操作符自然保留与源元素相同的顺序,将每个源元素的内部元素按顺序连接起来。
*交错:该运算符不允许来自不同内部的值交错(串联)。
单个发布服务器中的错误将在整个concat序列结束时延迟(如果多个源出错,可能会合并为异常#isMultiple(Throwable))。
代码示例来源:origin: reactor/reactor-core
return concatMapDelayError(mapper, Queues.XS_BUFFER_SIZE);
代码示例来源:origin: reactor/reactor-core
@Test
public void prefetchMaxTranslatesToUnboundedRequest2() {
AtomicLong requested = new AtomicLong();
StepVerifier.create(Flux.just(1, 2, 3).hide()
.doOnRequest(requested::set)
.concatMapDelayError(i -> Flux.range(0, i), Integer.MAX_VALUE))
.expectNext(0, 0, 1, 0, 1, 2)
.verifyComplete();
assertThat(requested.get())
.isNotEqualTo(Integer.MAX_VALUE)
.isEqualTo(Long.MAX_VALUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void boundaryFusionDelayError() {
Flux.range(1, 10000)
.publishOn(Schedulers.single())
.map(t -> Thread.currentThread().getName().contains("single-") ? "single" : ("BAD-" + t + Thread.currentThread().getName()))
.concatMapDelayError(Flux::just)
.publishOn(Schedulers.elastic())
.distinct()
.as(StepVerifier::create)
.expectFusion()
.expectNext("single")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_errorFromUpstreamFailure() {
return Arrays.asList(
scenario(f -> f.concatMap(Flux::just)),
scenario(f -> f.concatMap(Flux::just, 1)).prefetch(1),
scenario(f -> f.concatMapDelayError(Flux::just))
.shouldHitDropErrorHookAfterTerminate(true),
scenario(f -> f.concatMapDelayError(Flux::just, true, 32))
.shouldHitDropErrorHookAfterTerminate(true)
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardDelayedOnDrainMapperError() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatMapDelayError(i -> { throw new IllegalStateException("boom"); }))
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void delayErrorConcatMapVsFlatMap() {
Function<Integer, Flux<String>> mapFunction = i -> {
char c = (char) ('A' + i);
return Flux.range(1, i + 1)
.doOnNext(v -> {
if (i == 3 && v == 3) {
throw new IllegalStateException("boom " + c + v);
}
})
.map(v -> "" + c + "" + v);
};
Flux<Integer> source = Flux.range(0, 5);
Flux<String> concatMap = source.concatMapDelayError(mapFunction)
.materialize()
.map(Object::toString);
Flux<String> flatMap = source.flatMapDelayError(mapFunction, 2, 32)
.materialize()
.map(Object::toString);
List<String> signalsConcat = concatMap.collectList().block();
List<String> signalsFlat = flatMap.collectList().block();
Assertions.assertThat(signalsConcat)
.containsExactlyElementsOf(signalsFlat);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardDelayedOnCancel() {
StepVerifier.create(Flux.just(1, 2, 3)
.concatMapDelayError(i -> Mono.just("value" + i), 1),
0)
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}
代码示例来源:origin: reactor/reactor-core
Flux<String> concatMap = source.concatMapDelayError(mapFunction)
.doOnError(t -> concatSuppressed.addAll(
Arrays.asList(t.getSuppressed())))
代码示例来源:origin: reactor/reactor-core
@Test
public void concatMapDelayErrorWithFluxError() {
StepVerifier.create(
Flux.just(
Flux.just(1, 2),
Flux.<Integer>error(new Exception("test")),
Flux.just(3, 4))
.concatMapDelayError(f -> f, true, 32))
.expectNext(1, 2, 3, 4)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void concatMapDelayErrorWithMonoError() {
StepVerifier.create(
Flux.just(
Flux.just(1, 2),
Mono.<Integer>error(new Exception("test")),
Flux.just(3, 4))
.concatMapDelayError(f -> f, true, 32))
.expectNext(1, 2, 3, 4)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publisherOfPublisherDelayEnd3() {
StepVerifier.create(Flux.just(Flux.just(1, 2)
.concatWith(Flux.error(new Exception("test"))),
Flux.just(3, 4))
.concatMapDelayError(f -> f, true, 128))
.expectNext(1, 2, 3, 4)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void publisherOfPublisherDelayEndNot3() {
StepVerifier.create(Flux.just(Flux.just(1, 2)
.concatWith(Flux.error(new Exception("test"))),
Flux.just(3, 4))
.concatMapDelayError(f -> f, false, 128))
.expectNext(1, 2)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorError() {
return Arrays.asList(
scenario(f -> f.concatMap(d -> {
throw exception();
})),
scenario(f -> f.concatMap(d -> null))
,
scenario(f -> f.concatMap(d -> {
throw exception();
}, 1)).prefetch(1),
scenario(f -> f.concatMap(d -> null, 1))
.prefetch(1)
,
scenario(f -> f.concatMapDelayError(d -> {
throw exception();
}))
.shouldHitDropErrorHookAfterTerminate(true),
scenario(f -> f.concatMapDelayError(d -> null))
.shouldHitDropErrorHookAfterTerminate(true)
,
scenario(f -> f.concatMapDelayError(d -> {
throw exception();
}, true, 32))
.shouldHitDropErrorHookAfterTerminate(true),
scenario(f -> f.concatMapDelayError(d -> null, true, 32))
.shouldHitDropErrorHookAfterTerminate(true)
);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(
scenario(f -> f.concatMap(Flux::just)),
scenario(f -> f.concatMap(d -> Flux.just(d).hide())),
scenario(f -> f.concatMap(d -> Flux.empty()))
.receiverEmpty(),
scenario(f -> f.concatMapDelayError(Flux::just)),
scenario(f -> f.concatMapDelayError(d -> Flux.just(d).hide())),
scenario(f -> f.concatMapDelayError(d -> Flux.empty()))
.receiverEmpty(),
scenario(f -> f.concatMapDelayError(Flux::just, true, 32)),
scenario(f -> f.concatMapDelayError(d -> Flux.just(d).hide(), true, 32)),
scenario(f -> f.concatMapDelayError(d -> Flux.empty(), true, 32))
.receiverEmpty(),
scenario(f -> f.concatMap(Flux::just, 1)).prefetch(1),
//scenarios with fromCallable(() -> null)
scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null)))
.receiverEmpty(),
scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null), 1))
.prefetch(1)
.receiverEmpty(),
scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null)))
.shouldHitDropErrorHookAfterTerminate(true)
.receiverEmpty(),
scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null), true, 32))
.shouldHitDropErrorHookAfterTerminate(true)
.receiverEmpty()
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBoundary() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 2)
.concatMapDelayError(v -> Flux.range(v, 2))
.subscribe(ts);
ts.assertValues(1, 2, 2, 3)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBoundary2() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 2)
.hide()
.concatMapDelayError(v -> Flux.range(v, 2))
.subscribe(ts);
ts.assertValues(1, 2, 2, 3)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorModeContinueDelayErrorsWithCallable() {
Flux<Integer> test = Flux
.just(1, 2)
.hide()
.concatMapDelayError(f -> {
if(f == 1){
return Mono.<Integer>error(new NullPointerException());
}
else {
return Mono.just(f);
}
})
.onErrorContinue(OnNextFailureStrategyTest::drop);
StepVerifier.create(test)
.expectNoFusionSupport()
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorModeContinueDelayErrors() {
Flux<Integer> test = Flux
.just(1, 2)
.hide()
.concatMapDelayError(f -> {
if(f == 1){
return Mono.<Integer>error(new NullPointerException()).hide();
}
else {
return Mono.just(f);
}
})
.onErrorContinue(OnNextFailureStrategyTest::drop);
StepVerifier.create(test)
.expectNoFusionSupport()
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
// When inner is not a Callable error value is not available.
.hasNotDroppedElements()
.hasDroppedErrors(1);
}
代码示例来源:origin: reactor/reactor-kafka
@Override
public KafkaOutbound<K, V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K, V>>> transactionRecords) {
return then(Flux.from(transactionRecords)
.publishOn(sender.senderOptions.scheduler())
.concatMapDelayError(records -> sender.transaction(records), false, 1));
}
代码示例来源:origin: reactor/reactor-kafka
@Override
public <T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K, V, T>>> transactionRecords) {
UnicastProcessor<Object> processor = UnicastProcessor.create();
return Flux.from(transactionRecords)
.publishOn(senderOptions.scheduler(), false, 1)
.concatMapDelayError(records -> transaction(records, processor), false, 1)
.window(processor)
.doOnTerminate(() -> processor.onComplete())
.doOnCancel(() -> processor.onComplete());
}
内容来源于网络,如有侵权,请联系作者删除!