本文整理了Java中reactor.core.publisher.Flux.doAfterTerminate()
方法的一些代码示例,展示了Flux.doAfterTerminate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doAfterTerminate()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doAfterTerminate
[英]Add behavior (side-effect) triggered after the Flux terminates, either by completing downstream successfully or with an error.
[中]添加流量终止后触发的行为(副作用),通过成功完成下游或出现错误。
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanListenForTerminalStates() {
// "A deferred Flux can listen for terminal states"
// given: "a composable with an initial value"
Flux<String> stream = Flux.just("test");
// when: "the complete signal is observed and flux is retrieved"
AtomicReference<Object> value = new AtomicReference<>();
stream.doAfterTerminate(() -> value.set(Boolean.TRUE))
.subscribe(value::set);
// then: "it is available"
assertThat(value.get())
.isNotNull()
.isNotEqualTo("test")
.isEqualTo(Boolean.TRUE);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncPollAfterTerminateCalled() {
AtomicBoolean onAfterTerminate = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doAfterTerminate(() -> onAfterTerminate.set(true))
.publish();
StepVerifier.create(f)
.then(f::connect)
.expectNext(1)
.verifyComplete();
assertThat(onAfterTerminate.get()).withFailMessage("onAfterTerminate not called back").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncPollConditionalAfterTerminateCalled() {
AtomicBoolean onAfterTerminate = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doAfterTerminate(() -> onAfterTerminate.set(true))
.filter(v -> true)
.publish();
StepVerifier.create(f)
.then(f::connect)
.expectNext(1)
.verifyComplete();
assertThat(onAfterTerminate.get()).withFailMessage("onAfterTerminate not called back").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void noFusionAfterTerminateCalled() {
AtomicBoolean onTerminate = new AtomicBoolean();
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 2)
.doAfterTerminate(() -> onTerminate.set(true))
.subscribe(ts);
ts.assertNoError()
.assertValues(1, 2)
.assertComplete();
Assert.assertTrue("onComplete not called back", onTerminate.get());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncdoAfterTerminateCalled() {
AtomicBoolean onTerminate = new AtomicBoolean();
AssertSubscriber<Object> ts = AssertSubscriber.create();
Flux.range(1, 2)
.hide()
.doAfterTerminate(() -> onTerminate.set(true))
.subscribe(ts);
ts.assertNoError()
.assertValues(1, 2)
.assertComplete();
Assert.assertTrue("onComplete not called back", onTerminate.get());
}
代码示例来源:origin: reactor/reactor-core
combinedScenarios.addAll(Arrays.asList(scenario(f -> f.doAfterTerminate(() -> {
throw droppedException();
})).fusionMode(Fuseable.NONE)
代码示例来源:origin: reactor/reactor-core
@Override
Flux<Integer> transformFlux(Flux<Integer> f) {
Flux<String> otherStream = Flux.just("test", "test2", "test3");
// System.out.println("Providing new downstream");
Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);
BiFunction<Integer, String, Integer> combinator = (t1, t2) -> t1;
return f.publishOn(sharedGroup)
.parallel(2)
.groups()
.flatMap(stream -> stream.publishOn(asyncGroup)
.doOnNext(this::monitorThreadUse)
.scan((prev, next) -> next)
.map(integer -> -integer)
.filter(integer -> integer <= 0)
.map(integer -> -integer)
.bufferTimeout(batch, Duration.ofMillis(50))
.flatMap(Flux::fromIterable)
.flatMap(i -> Flux.zip(Flux.just(i), otherStream, combinator))
)
.publishOn(sharedGroup)
.doAfterTerminate(asyncGroup::dispose)
.doOnError(Throwable::printStackTrace);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void sampleTest() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Disposable top10every1second =
Flux.fromIterable(PULP_SAMPLE)
.publishOn(asyncGroup)
.flatMap(samuelJackson ->
Flux
.fromArray(samuelJackson.split(" "))
.publishOn(asyncGroup)
.filter(w -> !w.trim().isEmpty())
.doOnNext(i -> simulateLatency())
)
.window(Duration.ofSeconds(2))
.flatMap(s -> s.groupBy(w -> w)
.flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
.collectSortedList((a, b) -> -a.getT2().compareTo(b.getT2()))
.flatMapMany(Flux::fromIterable)
.take(10)
.doAfterTerminate(() -> LOG.info("------------------------ window terminated" +
"----------------------"))
)
.subscribe(
entry -> LOG.info(entry.getT1() + ": " + entry.getT2()),
error -> LOG.error("", error),
latch::countDown
);
awaitLatch(top10every1second, latch);
}
代码示例来源:origin: reactor/reactor-core
})).producerEmpty(),
scenario(f -> f.doAfterTerminate(() -> {
throw exception();
})).producerEmpty(),
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(scenario(f -> f.doOnSubscribe(s -> {
})),
scenario(f -> f.doOnError(s -> {
})),
scenario(f -> f.doOnTerminate(() -> {
})),
scenario(f -> f.doAfterTerminate(() -> {
})),
scenario(f -> f.doOnCancel(() -> {
})),
scenario(f -> f.doOnComplete(() -> {
})),
scenario(f -> f.doOnRequest(d -> {
})),
scenario(f -> f.doOnRequest(s -> {
throw new RuntimeException(); //ignored
})),
scenario(f -> f.doOnNext(s -> {
})),
scenario(f -> f.doOnError(s -> {
})));
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param afterTerminate
* @return
* @see reactor.core.publisher.Flux#doAfterTerminate(java.lang.Runnable)
*/
public final Flux<T> doAfterTerminate(Runnable afterTerminate) {
return boxed.doAfterTerminate(afterTerminate);
}
/**
代码示例来源:origin: reactor/reactor-kafka
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
this.ackMode = AckMode.AUTO_ACK;
Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
return flux
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords)
new CommittableOffset(r).acknowledge();
}));
}
代码示例来源:origin: io.projectreactor.kafka/reactor-kafka
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
this.ackMode = AckMode.AUTO_ACK;
Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
return flux
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords)
new CommittableOffset(r).acknowledge();
}));
}
代码示例来源:origin: io.projectreactor.kafka/reactor-kafka
private synchronized Flux<ConsumerRecords<K, V>> createConsumerFlux() {
if (consumerFlux != null)
throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
Consumer<Flux<?>> kafkaSubscribeOrAssign = (flux) -> receiverOptions.subscriber(this).accept(consumer);
initEvent = new InitEvent(kafkaSubscribeOrAssign);
pollEvent = new PollEvent();
commitEvent = new CommitEvent();
recordEmitter = EmitterProcessor.create();
recordSubmission = recordEmitter.sink();
scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get());
consumerFlux = recordEmitter
.publishOn(scheduler)
.doOnSubscribe(s -> {
try {
start();
} catch (Exception e) {
log.error("Subscription to event flux failed", e);
throw e;
}
})
.doOnRequest(r -> {
if (requestsPending.get() > 0)
pollEvent.scheduleIfRequired();
})
.doAfterTerminate(this::dispose)
.doOnCancel(this::dispose);
return consumerFlux;
}
代码示例来源:origin: io.projectreactor.kafka/reactor-kafka
private Flux<ConsumerRecord<K, V>> transactionalRecords(TransactionManager transactionManager, ConsumerRecords<K, V> records) {
if (records.isEmpty())
return Flux.empty();
CommittableBatch offsetBatch = new CommittableBatch();
for (ConsumerRecord<K, V> r : records)
offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
return Flux.fromIterable(records)
.concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), receiverOptions.groupId()))
.doAfterTerminate(() -> awaitingTransaction.set(false));
}
代码示例来源:origin: reactor/reactor-kafka
private Flux<ConsumerRecord<K, V>> transactionalRecords(TransactionManager transactionManager, ConsumerRecords<K, V> records) {
if (records.isEmpty())
return Flux.empty();
CommittableBatch offsetBatch = new CommittableBatch();
for (ConsumerRecord<K, V> r : records)
offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
return Flux.fromIterable(records)
.concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), receiverOptions.groupId()))
.doAfterTerminate(() -> awaitingTransaction.set(false));
}
代码示例来源:origin: reactor/reactor-kafka
private synchronized Flux<ConsumerRecords<K, V>> createConsumerFlux() {
if (consumerFlux != null)
throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
Consumer<Flux<?>> kafkaSubscribeOrAssign = (flux) -> receiverOptions.subscriber(this).accept(consumer);
initEvent = new InitEvent(kafkaSubscribeOrAssign);
pollEvent = new PollEvent();
commitEvent = new CommitEvent();
recordEmitter = EmitterProcessor.create();
recordSubmission = recordEmitter.sink();
scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get());
consumerFlux = recordEmitter
.publishOn(scheduler)
.doOnSubscribe(s -> {
try {
start();
} catch (Exception e) {
log.error("Subscription to event flux failed", e);
throw e;
}
})
.doOnRequest(r -> {
if (requestsPending.get() > 0)
pollEvent.scheduleIfRequired();
})
.doAfterTerminate(this::dispose)
.doOnCancel(this::dispose);
return consumerFlux;
}
代码示例来源:origin: reactor/reactor-netty
.doAfterTerminate(encoder);
代码示例来源:origin: io.projectreactor.netty/reactor-netty
.doAfterTerminate(encoder);
代码示例来源:origin: io.projectreactor.ipc/reactor-netty
.doAfterTerminate(encoder);
内容来源于网络,如有侵权,请联系作者删除!