reactor.core.publisher.Flux.takeLast()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.2k)|赞(0)|评价(0)|浏览(230)

本文整理了Java中reactor.core.publisher.Flux.takeLast()方法的一些代码示例,展示了Flux.takeLast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.takeLast()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:takeLast

Flux.takeLast介绍

[英]Emit the last N values this Flux emitted before its completion.
[中]

代码示例

代码示例来源:origin: reactor/reactor-core

Flux<Integer> scenario_rangeTimedTake() {
  return Flux.range(1, Integer.MAX_VALUE)
        .delayElements(Duration.ofMillis(100))
        .take(Duration.ofSeconds(4))
        .takeLast(1);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void negativeNumber() {
  Flux.never().takeLast(-1);
}

代码示例来源:origin: reactor/reactor-core

@Override
public StepVerifier.Step<T> then() {
  return step.consumeSubscriptionWith(s -> {
    //the current subscription
    Scannable lowest = Scannable.from(s);
    //attempt to go back to the leftmost parent to check the Context from its perspective
    Context c = Flux.<Scannable>
        fromStream(lowest.parents())
        .ofType(CoreSubscriber.class)
        .takeLast(1)
        .singleOrEmpty()
        //no parent? must be a ScalaSubscription or similar source
        .switchIfEmpty(
            Mono.just(lowest)
              //unless it is directly a CoreSubscriber, let's try to scan the leftmost, see if it has an ACTUAL
              .map(sc -> (sc instanceof CoreSubscriber) ?
                  sc :
                  sc.scanOrDefault(Scannable.Attr.ACTUAL, Scannable.from(null)))
              //we are ultimately only interested in CoreSubscribers' Context
              .ofType(CoreSubscriber.class)
        )
        .map(CoreSubscriber::currentContext)
        //if it wasn't a CoreSubscriber (eg. custom or vanilla Subscriber) there won't be a Context
        .block();
    this.contextExpectations.accept(c);
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  StepVerifier.create(Flux.error(new Exception("test"))
              .takeLast(1))
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void empty() {
  Flux<?> f = Flux.empty()
          .takeLast(1);
  assertThat(f.getPrefetch()).isEqualTo(Integer.MAX_VALUE);
  StepVerifier.create(f)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void illegal() {
  StepVerifier.create(Flux.empty()
              .takeLast(-1))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normal() {
  StepVerifier.create(Flux.range(1, 100)
              .takeLast(1))
        .expectNext(100)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void normalHide() {
    StepVerifier.create(Flux.range(1, 100)
                .hide()
                .takeLast(1))
          .expectNext(100)
          .verifyComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeOne() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).takeLast(1).subscribe(ts);
  ts.assertValues(10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeSome() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).takeLast(5).subscribe(ts);
  ts.assertValues(6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeAll() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).takeLast(20).subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeNone() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.range(1, 10).takeLast(0).subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeNoneBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10).takeLast(0).subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeOneBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10).takeLast(1).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeAllBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10).takeLast(20).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(1, 2)
   .assertNotComplete()
   .assertNoError();
  ts.request(5);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7)
   .assertNotComplete()
   .assertNoError();
  ts.request(10);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void takeSomeBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.range(1, 10).takeLast(5).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(6, 7)
   .assertNotComplete()
   .assertNoError();
  ts.request(2);
  ts.assertValues(6, 7, 8, 9)
   .assertNotComplete()
   .assertNoError();
  ts.request(10);
  ts.assertValues(6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param n
 * @return
 * @see reactor.core.publisher.Flux#takeLast(int)
 */
public final Flux<T> takeLast(int n) {
  return boxed.takeLast(n);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

@Override
public ReactiveSeq<T> limitLast(int num) {
  return flux(flux.takeLast(num));
}

代码示例来源:origin: hantsy/spring-reactive-sample

@Override
public Mono<Void> handle(WebSocketSession session) {
  String protocol = session.getHandshakeInfo().getSubProtocol();
  WebSocketMessage message = session.textMessage(this.posts.findAll().takeLast(0).toString());
  return doSend(session, Mono.just(message));
}

代码示例来源:origin: io.projectreactor/reactor-test

@Override
public StepVerifier.Step<T> then() {
  return step.consumeSubscriptionWith(s -> {
    //the current subscription
    Scannable lowest = Scannable.from(s);
    //attempt to go back to the leftmost parent to check the Context from its perspective
    Context c = Flux.<Scannable>
        fromStream(lowest.parents())
        .ofType(CoreSubscriber.class)
        .takeLast(1)
        .singleOrEmpty()
        //no parent? must be a ScalaSubscription or similar source
        .switchIfEmpty(
            Mono.just(lowest)
              //unless it is directly a CoreSubscriber, let's try to scan the leftmost, see if it has an ACTUAL
              .map(sc -> (sc instanceof CoreSubscriber) ?
                  sc :
                  sc.scanOrDefault(Scannable.Attr.ACTUAL, Scannable.from(null)))
              //we are ultimately only interested in CoreSubscribers' Context
              .ofType(CoreSubscriber.class)
        )
        .map(CoreSubscriber::currentContext)
        //if it wasn't a CoreSubscriber (eg. custom or vanilla Subscriber) there won't be a Context
        .block();
    this.contextExpectations.accept(c);
  });
}

相关文章

Flux类方法