本文整理了Java中reactor.core.publisher.Flux.withLatestFrom()
方法的一些代码示例,展示了Flux.withLatestFrom()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.withLatestFrom()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:withLatestFrom
[英]Combine the most recently emitted values from both this Flux and another Publisher through a BiFunction and emits the result.
The operator will drop values from this Flux until the other Publisher produces any value.
If the other Publisher completes without any value, the sequence is completed.
[中]通过双函数将此流量和另一个发布者最近发出的值组合起来,并发出结果。
操作符将从该流量中删除值,直到其他发布者生成任何值。
如果其他发布服务器在没有任何值的情况下完成,则序列完成。
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void otherNull() {
Flux.never()
.withLatestFrom(null, (a, b) -> a);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void combinerNull() {
Flux.never()
.withLatestFrom(Flux.never(), null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void combinerThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).<Integer, Integer>withLatestFrom(Flux.just(10),
(a, b) -> {
throw new RuntimeException("forced failure");
}).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorWith( e -> Assert.assertTrue(e.getMessage().contains("forced failure")));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void combineLatest2Null() {
StepVerifier.create(Flux.just(1).withLatestFrom(Flux.just(2), (a, b) ->
null))
.verifyError(NullPointerException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.withLatestFrom(Flux.just(10), (a, b) -> a + b)
.subscribe
(ts);
ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void combinerReturnsNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.withLatestFrom(Flux.just(10), (a, b) -> (Integer) null)
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(NullPointerException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherIsNever() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10)
.withLatestFrom(Flux.<Integer>empty(), (a, b) -> a + b)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void otherIsEmpty() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.withLatestFrom(Flux.<Integer>empty(), (a, b) -> a + b)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10)
.withLatestFrom(Flux.just(10), (a, b) -> a + b)
.subscribe
(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(11, 12)
.assertNotComplete()
.assertNoError();
ts.request(5);
ts.assertValues(11, 12, 13, 14, 15, 16, 17)
.assertNotComplete()
.assertNoError();
ts.request(10);
ts.assertValues(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param other
* @param resultSelector
* @return
* @see reactor.core.publisher.Flux#withLatestFrom(org.reactivestreams.Publisher, java.util.function.BiFunction)
*/
public final <U, R> Flux<R> withLatestFrom(Publisher<? extends U> other,
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
return boxed.withLatestFrom(other, resultSelector);
}
/**
代码示例来源:origin: org.springframework.cloud/spring-cloud-function-stream
protected Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
Function<Publisher<?>, Publisher<?>> function = functionCatalog.lookup(Function.class, name);
return flux.publish(values -> {
Publisher<?> result = function
.apply(values.map(message -> convertInput(function).apply(message)));
if (this.functionInspector.isMessage(function)) {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(function, message));
}
Flux<Map<String, Object>> aggregate = headers(values);
return aggregate.withLatestFrom(result,
(map, payload) -> message(map, payload));
});
}
内容来源于网络,如有侵权,请联系作者删除!