本文整理了Java中reactor.core.publisher.Flux.takeWhile()
方法的一些代码示例,展示了Flux.takeWhile()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.takeWhile()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:takeWhile
[英]Relay values from this Flux while a predicate returns TRUEfor the values (checked before each value is delivered). This only includes the matching data (unlike #takeUntil).
[中]中继来自该流量的值,同时谓词返回值的true(在每个值传递之前检查)。这只包括匹配数据(与#takeUntil不同)。
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void predicateNull() {
Flux.never()
.takeWhile(null);
}
代码示例来源:origin: reactor/reactor-core
private void expectWindow(int index, Predicate<? super Integer> innerCancelPredicate, List<Integer> values) {
AssertSubscriber<Integer> s = AssertSubscriber.create();
mainSubscriber.values().get(index)
.doOnCancel(() -> innerCancelled.incrementAndGet())
.doOnComplete(() -> {
innerCompleted.incrementAndGet();})
.doOnTerminate(() -> innerTerminated.incrementAndGet())
.takeWhile(innerCancelPredicate).subscribe(s);
s.assertValueSequence(values).assertNoError();
innerCreated.incrementAndGet();
}
代码示例来源:origin: reactor/reactor-core
.next()
.flatMapMany(flux -> flux
.takeWhile(s -> !"CHARLIE".equals(s))
.log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
.log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
incomingProcessor.next()
.flatMapMany(flux -> flux
.takeWhile(s -> !"CHARLIE".equals(s))
.log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
.log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
incomingProcessor.next()
.flatMapMany(flux -> flux
.takeWhile(s -> !"CHARLIE".equals(s))
.log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
.log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
代码示例来源:origin: reactor/reactor-core
@Test
public void aFluxCanBeLimitedWhile(){
StepVerifier.create(Flux.just("test", "test2", "test3")
.takeWhile("test"::equals)
)
.expectNext("test")
.verifyComplete();
}
代码示例来源:origin: kbastani/spring-cloud-event-sourcing-example
public Order getOrder(String orderId, Boolean validate) {
// Get the order for the event
Order order = orderRepository.findOne(orderId);
if (validate) {
try {
// Validate the account number of the event's order belongs to the user
validateAccountNumber(order.getAccountNumber());
} catch (Exception ex) {
return null;
}
}
Flux<OrderEvent> orderEvents =
Flux.fromStream(orderEventRepository.findOrderEventsByOrderId(order.getOrderId()));
// Aggregate the state of order
return orderEvents
.takeWhile(orderEvent -> orderEvent.getType() != OrderEventType.DELIVERED)
.reduceWith(() -> order, Order::incorporate)
.get();
}
代码示例来源:origin: kbastani/spring-cloud-event-sourcing-example
/**
* Aggregate the cart events of a user and return a {@link ShoppingCart} object
*
* @param user is the user to retrieve the shopping cart for
* @param catalog is the catalog used to generate the shopping cart
* @return a shopping cart representing the aggregate state of the user's cart
* @throws Exception
*/
public ShoppingCart aggregateCartEvents(User user, Catalog catalog) throws Exception {
Flux<CartEvent> cartEvents =
Flux.fromStream(cartEventRepository.getCartEventStreamByUser(user.getId()));
// Aggregate the state of the shopping cart
ShoppingCart shoppingCart = cartEvents
.takeWhile(cartEvent -> !ShoppingCart.isTerminal(cartEvent.getCartEventType()))
.reduceWith(() -> new ShoppingCart(catalog), ShoppingCart::incorporate)
.get();
shoppingCart.getLineItems();
return shoppingCart;
}
代码示例来源:origin: reactor/reactor-core
@Test
public void predicateThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeWhile(v -> {
throw new RuntimeException("forced failure");
})
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeSome() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeWhile(v -> v < 4)
.subscribe(ts);
ts.assertValues(1, 2, 3)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeAll() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeWhile(v -> true)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeNone() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeWhile(v -> false)
.subscribe(ts);
ts.assertNoValues()
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void repeatWithVolumeCondition() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 2)
.repeatWhen(v -> v.takeWhile(n -> n > 0))
.subscribe(ts);
ts.request(8);
ts.assertValues(1, 2, 1, 2, 1, 2, 1, 2)
.assertNoError()
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeNoneBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.takeWhile(v -> false)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertNoValues()
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeAllBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.takeWhile(v -> true)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1, 2)
.assertNoError()
.assertNotComplete();
ts.request(10);
ts.assertValues(1, 2, 3, 4, 5)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeSomeBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.takeWhile(v -> v < 4)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1, 2)
.assertNoError()
.assertNotComplete();
ts.request(10);
ts.assertValues(1, 2, 3)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param continuePredicate
* @return
* @see reactor.core.publisher.Flux#takeWhile(java.util.function.Predicate)
*/
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate) {
return boxed.takeWhile(continuePredicate);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
@Override
public ReactiveSeq<T> limitWhileClosed(Predicate<? super T> p) {
return flux(flux.takeWhile(p));
}
代码示例来源:origin: io.projectreactor.addons/reactor-extra
@Override
public Publisher<Long> apply(Flux<Long> companionValues) {
Instant timeoutInstant = calculateTimeout();
DefaultContext<T> context = new DefaultContext<>(applicationContext, 0, null, -1L);
return companionValues
.index()
.map(tuple -> repeatBackoff(tuple.getT2(), tuple.getT1() + 1L, timeoutInstant, context))
.takeWhile(backoff -> backoff != RETRY_EXHAUSTED)
.concatMap(backoff -> retryMono(backoff.delay));
}
代码示例来源:origin: chaokunyang/microservices-event-sourcing
public Order getOrder(String orderId, Boolean validate) {
// 获取订单
Order order = orderRepositroy.findOne(orderId);
if(validate) {
// 验证事件对应的订单的账户号(account number)属于用户
try {
validateAccountNumber(order.getAccountNumber());
} catch (Exception e) {
return null;
}
}
Flux<OrderEvent> orderEvents = Flux.fromStream(orderEventRepository.findOrderEventsByOrderId(orderId));
// 聚合订单状态
return orderEvents.takeWhile(orderEvent -> orderEvent.getType() != OrderEventType.DELIVERED)
.reduceWith(() -> order, Order::incorporate)
.get();
}
代码示例来源:origin: chaokunyang/microservices-event-sourcing
/**
* 聚合(Aggregate)一个用户的cart events,返回一个 {@link ShoppingCart}
* @param user 获取购物车的用户
* @param catalog 用于生成购物车的目录
* @return 一个表示用户购物车聚合状态的购物车
* @throws Exception 如果在购物车中的一个产品不在目录里面,则抛出异常
*/
public ShoppingCart aggregateCartEvents(User user, Catalog catalog) throws Exception {
Flux<CartEvent> cartEvents = Flux.fromStream(cartEventRepository.getCartEventStreamByUserId(user.getId()));
// 聚合购物车的状态
ShoppingCart shoppingCart = cartEvents.takeWhile(cartEvent -> !ShoppingCart.isTerminal(cartEvent.getCartEventType()))
.reduceWith(() -> new ShoppingCart(catalog), ShoppingCart::incorporate)
.get();
shoppingCart.getCartItems();
return shoppingCart;
}
内容来源于网络,如有侵权,请联系作者删除!