reactor.core.publisher.Flux.doOnDiscard()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(14.2k)|赞(0)|评价(0)|浏览(670)

本文整理了Java中reactor.core.publisher.Flux.doOnDiscard()方法的一些代码示例,展示了Flux.doOnDiscard()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnDiscard()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnDiscard

Flux.doOnDiscard介绍

[英]Modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.

The discardHook must be idempotent and safe to use on any instance of the desired type. Calls to this method are additive, and the order of invocation of the discardHookis the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)will let the filter invoke first then second handlers).

Two main categories of discarding operators exist:

  • filtering operators, dropping some source elements as part of their designed behavior
  • operators that prefetch a few elements and keep them around pending a request, but get cancelled/in error
    These operators are identified in the javadoc by the presence of an onDiscard Support section.
    [中]修改此操作符上游的整个操作符链的行为,以有条件地清理被这些操作符丢弃的元素。
    DiscardorHook必须是幂等的,并且可以安全地用于所需类型的任何实例。对该方法的调用是加法的,调用该方法的顺序与声明的顺序相同(调用.filter(…)。杜恩迪斯卡德(第一)。doOnDiscard(第二个)将让过滤器先调用,然后再调用第二个处理程序。
    存在两类主要的报废操作员:
    *筛选运算符,删除某些源元素作为其设计行为的一部分
    *预取一些元素并保留它们以等待请求,但被取消/出错的运算符
    这些操作符在javadoc中由onDiscard支持部分标识。

代码示例

代码示例来源:origin: spring-projects/spring-framework

/**
 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readByteChannel(
    Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  return Flux.using(channelSupplier,
      channel -> {
        ReadableByteChannelGenerator generator =
            new ReadableByteChannelGenerator(channel, dataBufferFactory,
                bufferSize);
        return Flux.generate(generator);
      },
      DataBufferUtils::closeChannel)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: spring-projects/spring-framework

}).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: org.springframework/spring-core

/**
 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readByteChannel(
    Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  return Flux.using(channelSupplier,
      channel -> {
        ReadableByteChannelGenerator generator =
            new ReadableByteChannelGenerator(channel, dataBufferFactory,
                bufferSize);
        return Flux.generate(generator);
      },
      DataBufferUtils::closeChannel)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
 * channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
    long position, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
  ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
  Flux<DataBuffer> result = Flux.using(channelSupplier,
      channel -> Flux.create(sink -> {
        AsynchronousFileChannelReadCompletionHandler completionHandler =
            new AsynchronousFileChannelReadCompletionHandler(channel,
                sink, position, dataBufferFactory, bufferSize);
        channel.read(byteBuffer, position, dataBuffer, completionHandler);
        sink.onDispose(completionHandler::dispose);
      }),
      DataBufferUtils::closeChannel);
  return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: org.springframework/spring-core

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  Flux<ContentChunk> chunks = Flux.from(body)
      .flatMap(Function.identity())
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
      .map(this::toContentChunk);
  ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
  this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
  return doCommit(this::completes);
}

代码示例来源:origin: org.springframework/spring-core

}).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: org.springframework/spring-web

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  Flux<ContentChunk> chunks = Flux.from(body)
      .flatMap(Function.identity())
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
      .map(this::toContentChunk);
  ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
  this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
  return doCommit(this::completes);
}

代码示例来源:origin: org.springframework/spring-core

/**
 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
 * channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
    long position, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
  ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
  Flux<DataBuffer> result = Flux.using(channelSupplier,
      channel -> Flux.create(sink -> {
        AsynchronousFileChannelReadCompletionHandler completionHandler =
            new AsynchronousFileChannelReadCompletionHandler(channel,
                sink, position, dataBufferFactory, bufferSize);
        channel.read(byteBuffer, position, dataBuffer, completionHandler);
        sink.onDispose(completionHandler::dispose);
      }),
      DataBufferUtils::closeChannel);
  return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

encodeData(data, valueType, mediaType, factory, hints),
      encodeText("\n", mediaType, factory))
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});

代码示例来源:origin: reactor/reactor-core

@Test
public void discardLocalOrder() {
  List<String> discardOrder = Collections.synchronizedList(new ArrayList<>(2));
  StepVerifier.create(Flux.range(1, 2)
              .hide() //hide both avoid the fuseable AND tryOnNext usage
              .filter(i -> i % 2 == 0)
              .doOnDiscard(Number.class, i -> discardOrder.add("FIRST"))
              .doOnDiscard(Integer.class, i -> discardOrder.add("SECOND"))
  )
        .expectNext(2)
        .expectComplete()
        .verify();
  Assertions.assertThat(discardOrder).containsExactly("FIRST", "SECOND");
}

代码示例来源:origin: org.springframework/spring-web

encodeData(data, valueType, mediaType, factory, hints),
      encodeText("\n", mediaType, factory))
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});

代码示例来源:origin: reactor/reactor-core

@Test
public void discardLocalMultipleFilters() {
  AtomicInteger discardNumberCount = new AtomicInteger();
  AtomicInteger discardStringCount = new AtomicInteger();
  StepVerifier.create(Flux.range(1, 12)
              .hide() //hide both avoid the fuseable AND tryOnNext usage
              .filter(i -> i % 2 == 0)
              .map(String::valueOf)
              .filter(s -> s.length() < 2)
              .doOnDiscard(Number.class, i -> discardNumberCount.incrementAndGet())
              .doOnDiscard(String.class, i -> discardStringCount.incrementAndGet())
  )
        .expectNext("2", "4", "6", "8")
        .expectComplete()
        .verify();
  Assertions.assertThat(discardNumberCount).hasValue(6); //1 3 5 7 9 11
  Assertions.assertThat(discardStringCount).hasValue(2); //10 12
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldBeAbleToCatchDiscardedElement() {
  TestPublisher<Integer> publisher = TestPublisher.createCold();
  Integer[] discarded = new Integer[1];
  Flux<String> switchTransformed = publisher.flux()
                       .switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf))
                       .doOnDiscard(Integer.class, e -> discarded[0] = e);
  publisher.next(1);
  StepVerifier.create(switchTransformed, 0)
        .thenCancel()
        .verify(Duration.ofSeconds(10));
  publisher.assertCancelled();
  publisher.assertWasRequested();
  Assertions.assertThat(discarded).containsExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shouldBeAbleToCatchDiscardedElementInCaseOfConditional() {
  TestPublisher<Integer> publisher = TestPublisher.createCold();
  Integer[] discarded = new Integer[1];
  Flux<String> switchTransformed = publisher.flux()
                       .switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf))
                       .filter(t -> true)
                       .doOnDiscard(Integer.class, e -> discarded[0] = e);
  publisher.next(1);
  StepVerifier.create(switchTransformed, 0)
        .thenCancel()
        .verify(Duration.ofSeconds(10));
  publisher.assertCancelled();
  publisher.assertWasRequested();
  Assertions.assertThat(discarded).contains(1);
}

代码示例来源:origin: rsocket/rsocket-java

@Test
public void shouldBeAbleToCatchDiscardedElement() {
 TestPublisher<Integer> publisher = TestPublisher.createCold();
 Integer[] discarded = new Integer[1];
 Flux<String> switchTransformed =
   publisher
     .flux()
     .transform(
       flux ->
         new SwitchTransformFlux<>(
           flux, (first, innerFlux) -> innerFlux.map(String::valueOf)))
     .doOnDiscard(Integer.class, e -> discarded[0] = e);
 publisher.next(1);
 StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));
 publisher.assertCancelled();
 publisher.assertWasRequested();
 Assert.assertArrayEquals(new Integer[] {1}, discarded);
}

代码示例来源:origin: rsocket/rsocket-java

@Test
 public void shouldBeAbleToCatchDiscardedElementInCaseOfConditional() {
  TestPublisher<Integer> publisher = TestPublisher.createCold();
  Integer[] discarded = new Integer[1];
  Flux<String> switchTransformed =
    publisher
      .flux()
      .transform(
        flux ->
          new SwitchTransformFlux<>(
            flux, (first, innerFlux) -> innerFlux.map(String::valueOf)))
      .filter(t -> true)
      .doOnDiscard(Integer.class, e -> discarded[0] = e);

  publisher.next(1);

  StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));

  publisher.assertCancelled();
  publisher.assertWasRequested();

  Assert.assertArrayEquals(new Integer[] {1}, discarded);
 }
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core

@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
  List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
  Flux<DataBuffer> inputFlux = Flux.from(inputStream)
      .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
      .bufferUntil(StringDecoder::isEndFrame)
      .map(StringDecoder::joinUntilEndFrame)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  return super.decode(inputFlux, elementType, mimeType, hints);
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-web

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  Flux<ContentChunk> chunks = Flux.from(body)
      .flatMap(Function.identity())
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
      .map(this::toContentChunk);
  ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
  this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
  return doCommit(this::completes);
}

相关文章

Flux类方法