本文整理了Java中reactor.core.publisher.Flux.zipWith()
方法的一些代码示例,展示了Flux.zipWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.zipWith()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:zipWith
[英]Zip this Flux with another Publisher source, that is to say wait for both to emit one element and combine these elements once into a Tuple2. The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
[中]用另一个发布源压缩这个流量,也就是说,等待两者都发出一个元素,并将这些元素组合成一个Tuple2。操作员将继续执行此操作,直到任何源完成。错误将立即转发。这种“分步合并”处理在分散-聚集场景中特别有用。
代码示例来源:origin: reactor/reactor-core
Flux<Integer> exponentialRepeatScenario1() {
AtomicInteger i = new AtomicInteger();
return Mono.fromCallable(i::incrementAndGet)
.repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(
time))));
}
代码示例来源:origin: reactor/reactor-core
Mono<String> exponentialRetryScenario() {
AtomicInteger i = new AtomicInteger();
return Mono.<String>create(s -> {
if (i.incrementAndGet() == 4) {
s.success("hey");
}
else {
s.error(new RuntimeException("test " + i));
}
}).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}
代码示例来源:origin: reactor/reactor-core
Flux<String> exponentialRetryScenario() {
AtomicInteger i = new AtomicInteger();
return Flux.<String>create(s -> {
if (i.incrementAndGet() == 4) {
s.next("hey");
}
else {
s.error(new RuntimeException("test " + i));
}
}).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_errorFromUpstreamFailure() {
return Arrays.asList(
scenario(f -> f.zipWith(Flux.just(1, 2, 3), 3, (a, b) -> a))
.prefetch(3),
scenario(f -> f.zipWith(Flux.<String>error(exception()),
(a, b) -> a)).shouldHitDropErrorHookAfterTerminate(false),
scenario(f -> f.zipWith(Flux.<String>error(exception()).hide(),
(a, b) -> a)));
}
代码示例来源:origin: reactor/reactor-core
Flux<String> exponentialRepeatScenario2() {
AtomicInteger i = new AtomicInteger();
return Mono.<String>create(s -> {
if (i.incrementAndGet() == 4) {
s.success("hey");
}
else {
s.success();
}
}).repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
.flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncFusionMapToNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.zipWith(Flux.fromIterable(Arrays.asList(1, 2))
.map(v -> v == 2 ? null : v), (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2)
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
/**
* See https://github.com/reactor/reactor-core/issues/453
*/
@Test
public void testDrainSyncCompletesSeveralBatches() {
//both hide and just with 2 elements are necessary to go into SYNC mode
StepVerifier.create(Flux.just(1, 2)
.flatMapIterable(t -> IntStream.rangeClosed(0, 35).boxed().collect(Collectors.toList()))
.hide()
.zipWith(Flux.range(1000, 100))
.count())
.expectNext(72L)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sameLength() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source = Flux.fromIterable(Arrays.asList(1, 2));
source.zipWith(source, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2, 4)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void differentLengthOpt() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.fromIterable(Arrays.asList(1, 2));
Flux<Integer> source2 = Flux.just(1, 2, 3);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2, 4)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nonEmptyAndEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.just(1, 2, 3);
Flux<Integer> source2 = Flux.fromIterable(Collections.emptyList());
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyNonEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.fromIterable(Collections.emptyList());
Flux<Integer> source2 = Flux.just(1, 2, 3);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void differentLength() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.fromIterable(Arrays.asList(1, 2));
Flux<Integer> source2 = Flux.just(1, 2, 3);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2, 4)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
/**
* See https://github.com/reactor/reactor-core/issues/453
*/
@Test
public void testDrainAsyncCompletesSeveralBatches() {
StepVerifier.create(Flux.range(0, 72)
.collectList()
.flatMapIterable(Function.identity())
.zipWith(Flux.range(1000, 100))
.count())
.expectNext(72L)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sameLengthOptimized() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source = Flux.just(1, 2);
source.zipWith(source, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2, 4)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorHandlingRetryWhenEquatesRetry() {
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), (error, index) -> { // <1>
if (index < 4) return index; // <2>
else throw Exceptions.propagate(error); // <3>
})
);
StepVerifier.create(flux)
.verifyError(IllegalArgumentException.class);
StepVerifier.create(Flux.<String>error(new IllegalArgumentException()).retry(3))
.verifyError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyScalar() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.empty();
Flux<Integer> source2 = Flux.just(1);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scalarNonScalar() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.just(1);
Flux<Integer> source2 = Flux.just(1, 2, 3);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scalarNonScalarOpt() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.just(1);
Flux<Integer> source2 = Flux.just(1, 2, 3);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scalarScalar() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux<Integer> source1 = Flux.just(1);
Flux<Integer> source2 = Flux.just(1);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scalarNonScalarBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux<Integer> source1 = Flux.just(1);
Flux<Integer> source2 = Flux.just(1, 2, 3);
source1.zipWith(source2, (a, b) -> a + b)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(1);
ts.assertValues(2)
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!