本文整理了Java中reactor.core.publisher.Flux.takeUntil()
方法的一些代码示例,展示了Flux.takeUntil()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.takeUntil()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:takeUntil
[英]Relay values from this Flux until the given Predicate matches. This includes the matching data (unlike #takeWhile).
[中]中继此流量的值,直到给定谓词匹配为止。这包括匹配数据(与#takeWhile不同)。
代码示例来源:origin: spring-projects/spring-framework
/**
* Relay buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
* the given maximum byte count, or until the publisher is complete.
* @param publisher the publisher to filter
* @param maxByteCount the maximum byte count
* @return a flux whose maximum byte count is {@code maxByteCount}
*/
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
}); // no doOnDiscard necessary, as this method does not drop buffers
}
代码示例来源:origin: org.springframework/spring-core
/**
* Relay buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
* the given maximum byte count, or until the publisher is complete.
* @param publisher the publisher to filter
* @param maxByteCount the maximum byte count
* @return a flux whose maximum byte count is {@code maxByteCount}
*/
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
}); // no doOnDiscard necessary, as this method does not drop buffers
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void predicateNull() {
Flux.never()
.takeUntil(null);
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void writeAndFlushWith() {
Mono<String> result = this.webClient.get()
.uri("/write-and-flush")
.retrieve()
.bodyToFlux(String.class)
.takeUntil(s -> s.endsWith("data1"))
.reduce((s1, s2) -> s1 + s2);
StepVerifier.create(result)
.expectNext("data0data1")
.expectComplete()
.verify(Duration.ofSeconds(10L));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void predicateThrows() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeUntil(v -> {
throw new RuntimeException("forced failure");
}).subscribe(ts);
ts.assertValues(1)
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeSome() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeUntil(v -> v == 3)
.subscribe(ts);
ts.assertValues(1, 2, 3)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void stopImmediately() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeUntil(v -> true)
.subscribe(ts);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeAll() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 5)
.takeUntil(v -> false)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void stopImmediatelyBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.takeUntil(v -> true)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeAllBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.takeUntil(v -> false)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1, 2)
.assertNoError()
.assertNotComplete();
ts.request(10);
ts.assertValues(1, 2, 3, 4, 5)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeSomeBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 5)
.takeUntil(v -> v == 3)
.subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
ts.request(2);
ts.assertValues(1, 2)
.assertNoError()
.assertNotComplete();
ts.request(10);
ts.assertValues(1, 2, 3)
.assertComplete()
.assertNoError();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param predicate
* @return
* @see reactor.core.publisher.Flux#takeUntil(java.util.function.Predicate)
*/
public final Flux<T> takeUntil(Predicate<? super T> predicate) {
return boxed.takeUntil(predicate);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
@Override
public ReactiveSeq<T> limitUntilClosed(Predicate<? super T> p) {
return flux(flux.takeUntil(p));
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core
/**
* Relay buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
* the given maximum byte count, or until the publisher is complete.
* @param publisher the publisher to filter
* @param maxByteCount the maximum byte count
* @return a flux whose maximum byte count is {@code maxByteCount}
*/
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
}); // no doOnDiscard necessary, as this method does not drop buffers
}
代码示例来源:origin: apache/servicemix-bundles
/**
* Relay buffers from the given {@link Publisher} until the total
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
* the given maximum byte count, or until the publisher is complete.
* @param publisher the publisher to filter
* @param maxByteCount the maximum byte count
* @return a flux whose maximum byte count is {@code maxByteCount}
*/
public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
return Flux.defer(() -> {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
}
else {
return buffer;
}
})
.takeUntil(buffer -> countDown.get() <= 0);
}); // no doOnDiscard necessary, as this method does not drop buffers
}
代码示例来源:origin: reactor/reactor-netty
in.receive()
.asString()
.takeUntil(s -> s.endsWith("CONTROL"))
.map(s -> "ECHO: " + s.replaceAll("CONTROL", ""))
.concatWith(Mono.just("DONE"))
.then(in.receive()
.asString()
.takeUntil(s -> s.endsWith("DONE"))
.map(s -> s.replaceAll("DONE", ""))
.filter(s -> !s.isEmpty())
.then(in.receive()
.asString()
.takeUntil(s -> s.endsWith("DONE"))
.map(s -> s.replaceAll("DONE", ""))
.filter(s -> !s.isEmpty())
代码示例来源:origin: reactor/reactor-netty
in.receive()
.asString(StandardCharsets.UTF_8)
.takeUntil(d -> d.contains("<- 1024 mark here"))
.reduceWith(String::new, String::concat)
.log("-----------------CLIENT2")
内容来源于网络,如有侵权,请联系作者删除!