reactor.core.publisher.Flux.buffer()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.0k)|赞(0)|评价(0)|浏览(284)

本文整理了Java中reactor.core.publisher.Flux.buffer()方法的一些代码示例,展示了Flux.buffer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.buffer()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:buffer

Flux.buffer介绍

[英]Collect all incoming values into a single List buffer that will be emitted by the returned Flux once this Flux completes.
[中]将所有传入值收集到单个列表缓冲区中,该缓冲区将在该流量完成后由返回的流量发出。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void bufferWillSubdivideAnInputFlux() {
  Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
  //"non overlapping buffers"
  List<List<Integer>> res = numbers.buffer(2, 3)
                   .buffer()
                   .blockLast();
  assertThat(res).containsExactly(Arrays.asList(1, 2),
      Arrays.asList(4, 5),
      Arrays.asList(7, 8));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWillSubdivideAnInputFluxGapTime() {
  return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .delayElements(Duration.ofMillis(99))
        .buffer(Duration.ofMillis(200), Duration.ofMillis(300));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWillSubdivideAnInputFluxTime() {
  return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .delayElements(Duration.ofMillis(99))
        .buffer(Duration.ofMillis(200));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_bufferWillSubdivideAnInputFluxTime2() {
  return Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .delayElements(Duration.ofMillis(99))
        .buffer(Duration.ofMillis(200));
}

代码示例来源:origin: reactor/reactor-core

Flux<List<Integer>> scenario_delayItems() {
  return Flux.range(1, 4)
        .buffer(2)
        .delayElements(Duration.ofMillis(1000));
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxEmptyBuffer() {
//        "flux empty buffer"
//        when:
    List<List<Object>> ranges = Flux.empty()
                    .buffer(Flux.never())
                    .collectList()
                    .block(Duration.ofMillis(100));

//        then:
    assertThat(ranges).isEmpty();
  }

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxEmptyBufferJust() {
//        "flux empty buffer just"() {
//        when:
    List<List<Object>> ranges = Flux.empty()
                    .buffer(Flux.just(1))
                    .collectList()
                    .block();

//        then:
    assertThat(ranges).isEmpty();
  }

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, List<String>>> scenarios_operatorError() {
  return Arrays.asList(
      scenario(f -> f.buffer(Flux.never(), () -> null)),
      scenario(f -> f.buffer(Flux.never(), () -> {
        throw exception();
      })));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void normalExact() {
  AssertSubscriber<List<Integer>> ts = AssertSubscriber.create();
  Flux.range(1, 10).buffer(2).subscribe(ts);
  ts.assertValues(Arrays.asList(1, 2),
      Arrays.asList(3, 4),
      Arrays.asList(5, 6),
      Arrays.asList(7, 8),
      Arrays.asList(9, 10))
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, List<String>>> scenarios_operatorSuccess() {
  return Arrays.asList(scenario(f -> f.buffer(Mono.never()))
          .receive(i -> assertThat(i).containsExactly(item(0), item(1), item(2)))
      .shouldAssertPostTerminateState(false),
      scenario(f -> f.buffer(Mono.just(1)))
          .receiverEmpty()
          .shouldAssertPostTerminateState(false)
  );
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  ParallelFlux<List<Integer>> source = Flux.just(500, 300).buffer(1).parallel(10);
  ParallelMergeSort<Integer> test = new ParallelMergeSort<>(source, Integer::compareTo);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnSkippedElements() {
  //the skip flavor should discard elements that are not added to any buffer
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5)
              .buffer(2, 3)
              .flatMapIterable(Function.identity()))
        .expectNext(1, 2, 4, 5)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly(3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnError() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .buffer(4))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void discardOnErrorOverlap() {
    StepVerifier.create(Flux.just(1, 2, 3)
                .concatWith(Mono.error(new IllegalStateException("boom")))
                .buffer(4, 2))
          .expectErrorMessage("boom")
          .verifyThenAssertThat()
          .hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void supplierThrows() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 10).buffer(2, 1, () -> {
    throw new RuntimeException("forced failure");
  }).subscribe(ts);
  ts.assertNoValues()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure")
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .buffer(4))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnError() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.error(new IllegalStateException("boom")))
              .buffer(Mono.never()))
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancel() {
  StepVerifier.create(Flux.just(1, 2, 3)
              .concatWith(Mono.never())
              .buffer(Mono.never()))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2, 3);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void supplierReturnsNull() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 10).buffer(2, 1, () -> null).subscribe(ts);
  ts.assertNoValues()
   .assertError(NullPointerException.class)
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnCancelOverlap() {
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6)
              .limitRequest(2)
              .concatWith(Mono.never())
              .buffer(4, 2))
        .thenAwait(Duration.ofMillis(10))
        .thenCancel()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 2);
}

相关文章

Flux类方法