reactor.core.publisher.Flux.takeUntil()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(274)

本文整理了Java中reactor.core.publisher.Flux.takeUntil()方法的一些代码示例,展示了Flux.takeUntil()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.takeUntil()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:takeUntil

Flux.takeUntil介绍

[英]Relay values from this Flux until the given Predicate matches. This includes the matching data (unlike #takeWhile).
[中]中继此流量的值,直到给定谓词匹配为止。这包括匹配数据(与#takeWhile不同)。

代码示例

代码示例来源:origin: spring-projects/spring-framework

/**
 * Relay buffers from the given {@link Publisher} until the total
 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
 * the given maximum byte count, or until the publisher is complete.
 * @param publisher the publisher to filter
 * @param maxByteCount the maximum byte count
 * @return a flux whose maximum byte count is {@code maxByteCount}
 */
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
  Assert.notNull(publisher, "Publisher must not be null");
  Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
  return Flux.defer(() -> {
    AtomicLong countDown = new AtomicLong(maxByteCount);
    return Flux.from(publisher)
        .map(buffer -> {
          long remainder = countDown.addAndGet(-buffer.readableByteCount());
          if (remainder < 0) {
            int length = buffer.readableByteCount() + (int) remainder;
            return buffer.slice(0, length);
          }
          else {
            return buffer;
          }
        })
        .takeUntil(buffer -> countDown.get() <= 0);
  }); // no doOnDiscard necessary, as this method does not drop buffers
}

代码示例来源:origin: org.springframework/spring-core

/**
 * Relay buffers from the given {@link Publisher} until the total
 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
 * the given maximum byte count, or until the publisher is complete.
 * @param publisher the publisher to filter
 * @param maxByteCount the maximum byte count
 * @return a flux whose maximum byte count is {@code maxByteCount}
 */
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
  Assert.notNull(publisher, "Publisher must not be null");
  Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
  return Flux.defer(() -> {
    AtomicLong countDown = new AtomicLong(maxByteCount);
    return Flux.from(publisher)
        .map(buffer -> {
          long remainder = countDown.addAndGet(-buffer.readableByteCount());
          if (remainder < 0) {
            int length = buffer.readableByteCount() + (int) remainder;
            return buffer.slice(0, length);
          }
          else {
            return buffer;
          }
        })
        .takeUntil(buffer -> countDown.get() <= 0);
  }); // no doOnDiscard necessary, as this method does not drop buffers
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void predicateNull() {
  Flux.never()
    .takeUntil(null);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeAndFlushWith() {
  Mono<String> result = this.webClient.get()
      .uri("/write-and-flush")
      .retrieve()
      .bodyToFlux(String.class)
      .takeUntil(s -> s.endsWith("data1"))
      .reduce((s1, s2) -> s1 + s2);
  StepVerifier.create(result)
      .expectNext("data0data1")
      .expectComplete()
      .verify(Duration.ofSeconds(10L));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void predicateThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeUntil(v -> {
    throw new RuntimeException("forced failure");
  }).subscribe(ts);
  ts.assertValues(1)
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeSome() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeUntil(v -> v == 3)
    .subscribe(ts);
  ts.assertValues(1, 2, 3)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void stopImmediately() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeUntil(v -> true)
    .subscribe(ts);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeAll() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 5)
    .takeUntil(v -> false)
    .subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void stopImmediatelyBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .takeUntil(v -> true)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeAllBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .takeUntil(v -> false)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNoError()
   .assertNotComplete();
  ts.request(10);
  ts.assertValues(1, 2, 3, 4, 5)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeSomeBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 5)
    .takeUntil(v -> v == 3)
    .subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNoError()
   .assertNotComplete();
  ts.request(10);
  ts.assertValues(1, 2, 3)
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param predicate
 * @return
 * @see reactor.core.publisher.Flux#takeUntil(java.util.function.Predicate)
 */
public final Flux<T> takeUntil(Predicate<? super T> predicate) {
  return boxed.takeUntil(predicate);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

@Override
public ReactiveSeq<T> limitUntilClosed(Predicate<? super T> p) {
  return flux(flux.takeUntil(p));
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core

/**
 * Relay buffers from the given {@link Publisher} until the total
 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
 * the given maximum byte count, or until the publisher is complete.
 * @param publisher the publisher to filter
 * @param maxByteCount the maximum byte count
 * @return a flux whose maximum byte count is {@code maxByteCount}
 */
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
  Assert.notNull(publisher, "Publisher must not be null");
  Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
  return Flux.defer(() -> {
    AtomicLong countDown = new AtomicLong(maxByteCount);
    return Flux.from(publisher)
        .map(buffer -> {
          long remainder = countDown.addAndGet(-buffer.readableByteCount());
          if (remainder < 0) {
            int length = buffer.readableByteCount() + (int) remainder;
            return buffer.slice(0, length);
          }
          else {
            return buffer;
          }
        })
        .takeUntil(buffer -> countDown.get() <= 0);
  }); // no doOnDiscard necessary, as this method does not drop buffers
}

代码示例来源:origin: apache/servicemix-bundles

/**
 * Relay buffers from the given {@link Publisher} until the total
 * {@linkplain DataBuffer#readableByteCount() byte count} reaches
 * the given maximum byte count, or until the publisher is complete.
 * @param publisher the publisher to filter
 * @param maxByteCount the maximum byte count
 * @return a flux whose maximum byte count is {@code maxByteCount}
 */
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
  Assert.notNull(publisher, "Publisher must not be null");
  Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
  return Flux.defer(() -> {
    AtomicLong countDown = new AtomicLong(maxByteCount);
    return Flux.from(publisher)
        .map(buffer -> {
          long remainder = countDown.addAndGet(-buffer.readableByteCount());
          if (remainder < 0) {
            int length = buffer.readableByteCount() + (int) remainder;
            return buffer.slice(0, length);
          }
          else {
            return buffer;
          }
        })
        .takeUntil(buffer -> countDown.get() <= 0);
  }); // no doOnDiscard necessary, as this method does not drop buffers
}

代码示例来源:origin: reactor/reactor-netty

in.receive()
 .asString()
 .takeUntil(s -> s.endsWith("CONTROL"))
 .map(s -> "ECHO: " + s.replaceAll("CONTROL", ""))
 .concatWith(Mono.just("DONE"))
    .then(in.receive()
       .asString()
       .takeUntil(s -> s.endsWith("DONE"))
       .map(s -> s.replaceAll("DONE", ""))
       .filter(s -> !s.isEmpty())
    .then(in.receive()
       .asString()
       .takeUntil(s -> s.endsWith("DONE"))
       .map(s -> s.replaceAll("DONE", ""))
       .filter(s -> !s.isEmpty())

代码示例来源:origin: reactor/reactor-netty

in.receive()
 .asString(StandardCharsets.UTF_8)
 .takeUntil(d -> d.contains("<- 1024 mark here"))
 .reduceWith(String::new, String::concat)
 .log("-----------------CLIENT2")

相关文章

Flux类方法