本文整理了Java中reactor.core.publisher.Flux.using()
方法的一些代码示例,展示了Flux.using()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.using()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:using
[英]Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.
For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see #usingWhen(Publisher,Function,Function,Function,Function).
[中]使用供应商为每个单独订阅者生成的资源,同时流式传输来自同一资源的发布者的值,并确保在序列终止或订阅者取消时释放资源。
即时资源清理发生在源终止之前,清理使用者引发的异常甚至可能会覆盖终端。
有关异步版本的清理,以及onComplete、onError和cancel终止的不同路径,请参见#usingWhen(Publisher、Function、Function、Function、Function)。
代码示例来源:origin: reactor/reactor-core
/**
* Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
* Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
* the Subscriber cancels.
* <p>
* Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer
* may override the terminal even.
* <p>
* <img class="marble" src="doc-files/marbles/usingForFlux.svg" alt="">
* <p>
* For an asynchronous version of the cleanup, with distinct path for onComplete, onError
* and cancel terminations, see {@link #usingWhen(Publisher, Function, Function, Function, Function)}.
*
* @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource
* @param sourceSupplier a factory to derive a {@link Publisher} from the supplied resource
* @param resourceCleanup a resource cleanup callback invoked on completion
* @param <T> emitted type
* @param <D> resource type
*
* @return a new {@link Flux} built around a disposable resource
* @see #usingWhen(Publisher, Function, Function, Function, Function)
* @see #usingWhen(Publisher, Function, Function)
*/
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) {
return using(resourceSupplier, sourceSupplier, resourceCleanup, true);
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readByteChannel(
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.using(channelSupplier,
channel -> {
ReadableByteChannelGenerator generator =
new ReadableByteChannelGenerator(channel, dataBufferFactory,
bufferSize);
return Flux.generate(generator);
},
DataBufferUtils::closeChannel)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: org.springframework/spring-core
/**
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readByteChannel(
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.using(channelSupplier,
channel -> {
ReadableByteChannelGenerator generator =
new ReadableByteChannelGenerator(channel, dataBufferFactory,
bufferSize);
return Flux.generate(generator);
},
DataBufferUtils::closeChannel)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
* channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void sourceFactoryNull() {
Flux.using(() -> 1, null, r -> {
}, false);
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(
scenario(f -> Flux.using(() -> 0, c -> f, c -> {})),
scenario(f -> Flux.using(() -> 0, c -> f, c -> {}, false))
.shouldAssertPostTerminateState(false)
);
}
代码示例来源:origin: org.springframework/spring-core
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
* channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void resourceSupplierNull() {
Flux.using(null, r -> Flux.empty(), r -> {
}, false);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void resourceCleanupNull() {
Flux.using(() -> 1, r -> Flux.empty(), null, false);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorHandlingUsing() {
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true); // <4>
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Flux<String> flux =
Flux.using(
() -> disposableInstance, // <1>
disposable -> Flux.just(disposable.toString()), // <2>
Disposable::dispose // <3>
);
StepVerifier.create(flux)
.expectNext("DISPOSABLE")
.verifyComplete();
assertThat(isDisposed.get()).isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorError() {
return Arrays.asList(
scenario(f -> Flux.using(() -> {
throw exception();
}, c -> f, c -> {}))
.fusionMode(Fuseable.NONE),
scenario(f -> Flux.using(() -> 0, c -> null, c -> {}))
.fusionMode(Fuseable.NONE),
scenario(f -> Flux.using(() -> 0, c -> {
throw exception();
}, c -> {}))
.fusionMode(Fuseable.NONE)
/*scenario(f -> Flux.using(() -> 0, c -> f, c -> {
throw exception();
}))
.verifier(step -> {
try {
step.expectNext(item(0), item(1), item(2)).verifyErrorMessage
("test");
}
catch (Exception t){
assertThat(Exceptions.unwrap(t)).hasMessage("test");
}
})*/
);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normalEager() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Flux.using(() -> 1, r -> Flux.range(r, 10), cleanup::set)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertComplete()
.assertNoError();
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void factoryReturnsNull() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Flux.<Integer, Integer>using(() -> 1,
r -> null,
cleanup::set,
false).subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(NullPointerException.class);
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void factoryThrowsEager() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Flux.using(() -> 1, r -> {
throw new RuntimeException("forced failure");
}, cleanup::set, false)
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void resourceThrowsEager() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Flux.using(() -> {
throw new RuntimeException("forced failure");
}, r -> Flux.range(1, 10), cleanup::set, false)
.subscribe(ts);
ts.assertNoValues()
.assertNotComplete()
.assertError(RuntimeException.class)
.assertErrorMessage("forced failure");
Assert.assertEquals(0, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void normal() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
Flux.using(() -> 1, r -> Flux.range(r, 10), cleanup::set, false)
.subscribe(ts);
ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertComplete()
.assertNoError();
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: reactor/reactor-core
Flux.using(() -> 1, r -> {
if (fail) {
return Flux.error(new RuntimeException("forced failure"));
代码示例来源:origin: reactor/reactor-core
@Test
public void subscriberCancels() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
AtomicInteger cleanup = new AtomicInteger();
DirectProcessor<Integer> tp = DirectProcessor.create();
Flux.using(() -> 1, r -> tp, cleanup::set, true)
.subscribe(ts);
Assert.assertTrue("No subscriber?", tp.hasDownstreams());
tp.onNext(1);
ts.assertValues(1)
.assertNotComplete()
.assertNoError();
ts.cancel();
tp.onNext(2);
ts.assertValues(1)
.assertNotComplete()
.assertNoError();
Assert.assertFalse("Has subscriber?", tp.hasDownstreams());
Assert.assertEquals(1, cleanup.get());
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core
/**
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readByteChannel(
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.using(channelSupplier,
channel -> {
ReadableByteChannelGenerator generator =
new ReadableByteChannelGenerator(channel, dataBufferFactory,
bufferSize);
return Flux.generate(generator);
},
DataBufferUtils::closeChannel)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: io.projectreactor/reactor-netty
@Override
default ByteBufEncodedFlux receive() {
return ByteBufEncodedFlux.encoded(receiveParts().onBackpressureError()
.concatMap(parts -> parts.aggregate()
.retain())
.flatMap(bb ->
Flux.using(() -> bb,
Flux::just,
ReferenceCounted::release)),
delegate().alloc());
}
内容来源于网络,如有侵权,请联系作者删除!