本文整理了Java中reactor.core.publisher.Flux.takeLast()
方法的一些代码示例,展示了Flux.takeLast()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.takeLast()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:takeLast
[英]Emit the last N values this Flux emitted before its completion.
[中]
代码示例来源:origin: reactor/reactor-core
Flux<Integer> scenario_rangeTimedTake() {
return Flux.range(1, Integer.MAX_VALUE)
.delayElements(Duration.ofMillis(100))
.take(Duration.ofSeconds(4))
.takeLast(1);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void negativeNumber() {
Flux.never().takeLast(-1);
}
代码示例来源:origin: reactor/reactor-core
@Override
public StepVerifier.Step<T> then() {
return step.consumeSubscriptionWith(s -> {
//the current subscription
Scannable lowest = Scannable.from(s);
//attempt to go back to the leftmost parent to check the Context from its perspective
Context c = Flux.<Scannable>
fromStream(lowest.parents())
.ofType(CoreSubscriber.class)
.takeLast(1)
.singleOrEmpty()
//no parent? must be a ScalaSubscription or similar source
.switchIfEmpty(
Mono.just(lowest)
//unless it is directly a CoreSubscriber, let's try to scan the leftmost, see if it has an ACTUAL
.map(sc -> (sc instanceof CoreSubscriber) ?
sc :
sc.scanOrDefault(Scannable.Attr.ACTUAL, Scannable.from(null)))
//we are ultimately only interested in CoreSubscribers' Context
.ofType(CoreSubscriber.class)
)
.map(CoreSubscriber::currentContext)
//if it wasn't a CoreSubscriber (eg. custom or vanilla Subscriber) there won't be a Context
.block();
this.contextExpectations.accept(c);
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
StepVerifier.create(Flux.error(new Exception("test"))
.takeLast(1))
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void empty() {
Flux<?> f = Flux.empty()
.takeLast(1);
assertThat(f.getPrefetch()).isEqualTo(Integer.MAX_VALUE);
StepVerifier.create(f)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void illegal() {
StepVerifier.create(Flux.empty()
.takeLast(-1))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
StepVerifier.create(Flux.range(1, 100)
.takeLast(1))
.expectNext(100)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalHide() {
StepVerifier.create(Flux.range(1, 100)
.hide()
.takeLast(1))
.expectNext(100)
.verifyComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeOne() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).takeLast(1).subscribe(ts);
ts.assertValues(10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeSome() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).takeLast(5).subscribe(ts);
ts.assertValues(6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeAll() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).takeLast(20).subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeNone() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Flux.range(1, 10).takeLast(0).subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeNoneBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10).takeLast(0).subscribe(ts);
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeOneBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10).takeLast(1).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeAllBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10).takeLast(20).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(1, 2)
.assertNotComplete()
.assertNoError();
ts.request(5);
ts.assertValues(1, 2, 3, 4, 5, 6, 7)
.assertNotComplete()
.assertNoError();
ts.request(10);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void takeSomeBackpressured() {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.range(1, 10).takeLast(5).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(6, 7)
.assertNotComplete()
.assertNoError();
ts.request(2);
ts.assertValues(6, 7, 8, 9)
.assertNotComplete()
.assertNoError();
ts.request(10);
ts.assertValues(6, 7, 8, 9, 10)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param n
* @return
* @see reactor.core.publisher.Flux#takeLast(int)
*/
public final Flux<T> takeLast(int n) {
return boxed.takeLast(n);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
@Override
public ReactiveSeq<T> limitLast(int num) {
return flux(flux.takeLast(num));
}
代码示例来源:origin: hantsy/spring-reactive-sample
@Override
public Mono<Void> handle(WebSocketSession session) {
String protocol = session.getHandshakeInfo().getSubProtocol();
WebSocketMessage message = session.textMessage(this.posts.findAll().takeLast(0).toString());
return doSend(session, Mono.just(message));
}
代码示例来源:origin: io.projectreactor/reactor-test
@Override
public StepVerifier.Step<T> then() {
return step.consumeSubscriptionWith(s -> {
//the current subscription
Scannable lowest = Scannable.from(s);
//attempt to go back to the leftmost parent to check the Context from its perspective
Context c = Flux.<Scannable>
fromStream(lowest.parents())
.ofType(CoreSubscriber.class)
.takeLast(1)
.singleOrEmpty()
//no parent? must be a ScalaSubscription or similar source
.switchIfEmpty(
Mono.just(lowest)
//unless it is directly a CoreSubscriber, let's try to scan the leftmost, see if it has an ACTUAL
.map(sc -> (sc instanceof CoreSubscriber) ?
sc :
sc.scanOrDefault(Scannable.Attr.ACTUAL, Scannable.from(null)))
//we are ultimately only interested in CoreSubscribers' Context
.ofType(CoreSubscriber.class)
)
.map(CoreSubscriber::currentContext)
//if it wasn't a CoreSubscriber (eg. custom or vanilla Subscriber) there won't be a Context
.block();
this.contextExpectations.accept(c);
});
}
内容来源于网络,如有侵权,请联系作者删除!