reactor.core.publisher.Flux.takeWhile()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(435)

本文整理了Java中reactor.core.publisher.Flux.takeWhile()方法的一些代码示例,展示了Flux.takeWhile()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.takeWhile()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:takeWhile

Flux.takeWhile介绍

[英]Relay values from this Flux while a predicate returns TRUEfor the values (checked before each value is delivered). This only includes the matching data (unlike #takeUntil).
[中]中继来自该流量的值,同时谓词返回值的true(在每个值传递之前检查)。这只包括匹配数据(与#takeUntil不同)。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void predicateNull() {
  Flux.never()
    .takeWhile(null);
}

代码示例来源:origin: reactor/reactor-core

private void expectWindow(int index, Predicate<? super Integer> innerCancelPredicate, List<Integer> values) {
  AssertSubscriber<Integer> s = AssertSubscriber.create();
  mainSubscriber.values().get(index)
      .doOnCancel(() -> innerCancelled.incrementAndGet())
      .doOnComplete(() -> {
        innerCompleted.incrementAndGet();})
      .doOnTerminate(() -> innerTerminated.incrementAndGet())
      .takeWhile(innerCancelPredicate).subscribe(s);
  s.assertValueSequence(values).assertNoError();
  innerCreated.incrementAndGet();
}

代码示例来源:origin: reactor/reactor-core

.next()
    .flatMapMany(flux -> flux
        .takeWhile(s -> !"CHARLIE".equals(s))
        .log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
    .log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
incomingProcessor.next()
         .flatMapMany(flux -> flux
             .takeWhile(s -> !"CHARLIE".equals(s))
             .log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
         .log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
incomingProcessor.next()
         .flatMapMany(flux -> flux
             .takeWhile(s -> !"CHARLIE".equals(s))
             .log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
         .log(String.format("stream.next.%d", nextIndex.getAndIncrement()))

代码示例来源:origin: reactor/reactor-core

@Test
public void aFluxCanBeLimitedWhile(){
  StepVerifier.create(Flux.just("test", "test2", "test3")
              .takeWhile("test"::equals)
  )
        .expectNext("test")
        .verifyComplete();
}

代码示例来源:origin: kbastani/spring-cloud-event-sourcing-example

public Order getOrder(String orderId, Boolean validate) {
  // Get the order for the event
  Order order = orderRepository.findOne(orderId);
  if (validate) {
    try {
      // Validate the account number of the event's order belongs to the user
      validateAccountNumber(order.getAccountNumber());
    } catch (Exception ex) {
      return null;
    }
  }
  Flux<OrderEvent> orderEvents =
      Flux.fromStream(orderEventRepository.findOrderEventsByOrderId(order.getOrderId()));
  // Aggregate the state of order
  return orderEvents
      .takeWhile(orderEvent -> orderEvent.getType() != OrderEventType.DELIVERED)
      .reduceWith(() -> order, Order::incorporate)
      .get();
}

代码示例来源:origin: kbastani/spring-cloud-event-sourcing-example

/**
 * Aggregate the cart events of a user and return a {@link ShoppingCart} object
 *
 * @param user    is the user to retrieve the shopping cart for
 * @param catalog is the catalog used to generate the shopping cart
 * @return a shopping cart representing the aggregate state of the user's cart
 * @throws Exception
 */
public ShoppingCart aggregateCartEvents(User user, Catalog catalog) throws Exception {
  Flux<CartEvent> cartEvents =
      Flux.fromStream(cartEventRepository.getCartEventStreamByUser(user.getId()));
  // Aggregate the state of the shopping cart
  ShoppingCart shoppingCart = cartEvents
      .takeWhile(cartEvent -> !ShoppingCart.isTerminal(cartEvent.getCartEventType()))
      .reduceWith(() -> new ShoppingCart(catalog), ShoppingCart::incorporate)
      .get();
  shoppingCart.getLineItems();
  return shoppingCart;
}

代码示例来源:origin: reactor/reactor-core

@Test
public void predicateThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeWhile(v -> {
      throw new RuntimeException("forced failure");
    })
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeSome() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeWhile(v -> v < 4)
    .subscribe(ts);
  ts.assertValues(1, 2, 3)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeAll() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeWhile(v -> true)
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeNone() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeWhile(v -> false)
    .subscribe(ts);
  ts.assertNoValues()
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void repeatWithVolumeCondition() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 2)
    .repeatWhen(v -> v.takeWhile(n -> n > 0))
    .subscribe(ts);
  ts.request(8);
  ts.assertValues(1, 2, 1, 2, 1, 2, 1, 2)
   .assertNoError()
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeNoneBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .takeWhile(v -> false)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertNoValues()
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeAllBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .takeWhile(v -> true)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNoError()
   .assertNotComplete();
  ts.request(10);
  ts.assertValues(1, 2, 3, 4, 5)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeSomeBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .takeWhile(v -> v < 4)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNoError()
   .assertNotComplete();
  ts.request(10);
  ts.assertValues(1, 2, 3)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param continuePredicate
 * @return
 * @see reactor.core.publisher.Flux#takeWhile(java.util.function.Predicate)
 */
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate) {
  return boxed.takeWhile(continuePredicate);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

@Override
public ReactiveSeq<T> limitWhileClosed(Predicate<? super T> p) {
  return flux(flux.takeWhile(p));
}

代码示例来源:origin: io.projectreactor.addons/reactor-extra

@Override
public Publisher<Long> apply(Flux<Long> companionValues) {
  Instant timeoutInstant = calculateTimeout();
  DefaultContext<T> context = new DefaultContext<>(applicationContext, 0, null, -1L);
  return companionValues
      .index()
      .map(tuple -> repeatBackoff(tuple.getT2(), tuple.getT1() + 1L, timeoutInstant, context))
      .takeWhile(backoff -> backoff != RETRY_EXHAUSTED)
      .concatMap(backoff -> retryMono(backoff.delay));
}

代码示例来源:origin: chaokunyang/microservices-event-sourcing

public Order getOrder(String orderId, Boolean validate) {
  // 获取订单
  Order order = orderRepositroy.findOne(orderId);
  if(validate) {
    // 验证事件对应的订单的账户号(account number)属于用户
    try {
      validateAccountNumber(order.getAccountNumber());
    } catch (Exception e) {
      return null;
    }
  }
  Flux<OrderEvent> orderEvents = Flux.fromStream(orderEventRepository.findOrderEventsByOrderId(orderId));
  // 聚合订单状态
  return orderEvents.takeWhile(orderEvent -> orderEvent.getType() != OrderEventType.DELIVERED)
      .reduceWith(() -> order, Order::incorporate)
      .get();
}

代码示例来源:origin: chaokunyang/microservices-event-sourcing

/**
 * 聚合(Aggregate)一个用户的cart events,返回一个 {@link ShoppingCart}
 * @param user 获取购物车的用户
 * @param catalog 用于生成购物车的目录
 * @return 一个表示用户购物车聚合状态的购物车
 * @throws Exception 如果在购物车中的一个产品不在目录里面,则抛出异常
 */
public ShoppingCart aggregateCartEvents(User user, Catalog catalog) throws Exception {
  Flux<CartEvent> cartEvents = Flux.fromStream(cartEventRepository.getCartEventStreamByUserId(user.getId()));
  // 聚合购物车的状态
  ShoppingCart shoppingCart = cartEvents.takeWhile(cartEvent -> !ShoppingCart.isTerminal(cartEvent.getCartEventType()))
      .reduceWith(() -> new ShoppingCart(catalog), ShoppingCart::incorporate)
      .get();
  shoppingCart.getCartItems();
  return shoppingCart;
}

相关文章

Flux类方法