本文整理了Java中reactor.core.publisher.Flux.doOnDiscard()
方法的一些代码示例,展示了Flux.doOnDiscard()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doOnDiscard()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doOnDiscard
[英]Modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.
The discardHook must be idempotent and safe to use on any instance of the desired type. Calls to this method are additive, and the order of invocation of the discardHookis the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)will let the filter invoke first then second handlers).
Two main categories of discarding operators exist:
代码示例来源:origin: spring-projects/spring-framework
/**
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readByteChannel(
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.using(channelSupplier,
channel -> {
ReadableByteChannelGenerator generator =
new ReadableByteChannelGenerator(channel, dataBufferFactory,
bufferSize);
return Flux.generate(generator);
},
DataBufferUtils::closeChannel)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
.bufferUntil(StringDecoder::isEndFrame)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return super.decode(inputFlux, elementType, mimeType, hints);
}
代码示例来源:origin: spring-projects/spring-framework
}).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
代码示例来源:origin: org.springframework/spring-core
/**
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readByteChannel(
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.using(channelSupplier,
channel -> {
ReadableByteChannelGenerator generator =
new ReadableByteChannelGenerator(channel, dataBufferFactory,
bufferSize);
return Flux.generate(generator);
},
DataBufferUtils::closeChannel)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
* channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: org.springframework/spring-core
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
.bufferUntil(StringDecoder::isEndFrame)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return super.decode(inputFlux, elementType, mimeType, hints);
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
代码示例来源:origin: org.springframework/spring-core
}).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
代码示例来源:origin: org.springframework/spring-web
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
代码示例来源:origin: org.springframework/spring-core
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
* channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: spring-projects/spring-framework
encodeData(data, valueType, mediaType, factory, hints),
encodeText("\n", mediaType, factory))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
代码示例来源:origin: reactor/reactor-core
@Test
public void discardLocalOrder() {
List<String> discardOrder = Collections.synchronizedList(new ArrayList<>(2));
StepVerifier.create(Flux.range(1, 2)
.hide() //hide both avoid the fuseable AND tryOnNext usage
.filter(i -> i % 2 == 0)
.doOnDiscard(Number.class, i -> discardOrder.add("FIRST"))
.doOnDiscard(Integer.class, i -> discardOrder.add("SECOND"))
)
.expectNext(2)
.expectComplete()
.verify();
Assertions.assertThat(discardOrder).containsExactly("FIRST", "SECOND");
}
代码示例来源:origin: org.springframework/spring-web
encodeData(data, valueType, mediaType, factory, hints),
encodeText("\n", mediaType, factory))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
代码示例来源:origin: reactor/reactor-core
@Test
public void discardLocalMultipleFilters() {
AtomicInteger discardNumberCount = new AtomicInteger();
AtomicInteger discardStringCount = new AtomicInteger();
StepVerifier.create(Flux.range(1, 12)
.hide() //hide both avoid the fuseable AND tryOnNext usage
.filter(i -> i % 2 == 0)
.map(String::valueOf)
.filter(s -> s.length() < 2)
.doOnDiscard(Number.class, i -> discardNumberCount.incrementAndGet())
.doOnDiscard(String.class, i -> discardStringCount.incrementAndGet())
)
.expectNext("2", "4", "6", "8")
.expectComplete()
.verify();
Assertions.assertThat(discardNumberCount).hasValue(6); //1 3 5 7 9 11
Assertions.assertThat(discardStringCount).hasValue(2); //10 12
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldBeAbleToCatchDiscardedElement() {
TestPublisher<Integer> publisher = TestPublisher.createCold();
Integer[] discarded = new Integer[1];
Flux<String> switchTransformed = publisher.flux()
.switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf))
.doOnDiscard(Integer.class, e -> discarded[0] = e);
publisher.next(1);
StepVerifier.create(switchTransformed, 0)
.thenCancel()
.verify(Duration.ofSeconds(10));
publisher.assertCancelled();
publisher.assertWasRequested();
Assertions.assertThat(discarded).containsExactly(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shouldBeAbleToCatchDiscardedElementInCaseOfConditional() {
TestPublisher<Integer> publisher = TestPublisher.createCold();
Integer[] discarded = new Integer[1];
Flux<String> switchTransformed = publisher.flux()
.switchOnFirst((first, innerFlux) -> innerFlux.map(String::valueOf))
.filter(t -> true)
.doOnDiscard(Integer.class, e -> discarded[0] = e);
publisher.next(1);
StepVerifier.create(switchTransformed, 0)
.thenCancel()
.verify(Duration.ofSeconds(10));
publisher.assertCancelled();
publisher.assertWasRequested();
Assertions.assertThat(discarded).contains(1);
}
代码示例来源:origin: rsocket/rsocket-java
@Test
public void shouldBeAbleToCatchDiscardedElement() {
TestPublisher<Integer> publisher = TestPublisher.createCold();
Integer[] discarded = new Integer[1];
Flux<String> switchTransformed =
publisher
.flux()
.transform(
flux ->
new SwitchTransformFlux<>(
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)))
.doOnDiscard(Integer.class, e -> discarded[0] = e);
publisher.next(1);
StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));
publisher.assertCancelled();
publisher.assertWasRequested();
Assert.assertArrayEquals(new Integer[] {1}, discarded);
}
代码示例来源:origin: rsocket/rsocket-java
@Test
public void shouldBeAbleToCatchDiscardedElementInCaseOfConditional() {
TestPublisher<Integer> publisher = TestPublisher.createCold();
Integer[] discarded = new Integer[1];
Flux<String> switchTransformed =
publisher
.flux()
.transform(
flux ->
new SwitchTransformFlux<>(
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)))
.filter(t -> true)
.doOnDiscard(Integer.class, e -> discarded[0] = e);
publisher.next(1);
StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));
publisher.assertCancelled();
publisher.assertWasRequested();
Assert.assertArrayEquals(new Integer[] {1}, discarded);
}
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
.bufferUntil(StringDecoder::isEndFrame)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return super.decode(inputFlux, elementType, mimeType, hints);
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-web
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
内容来源于网络,如有侵权,请联系作者删除!