本文整理了Java中reactor.core.publisher.Flux.publishNext()
方法的一些代码示例,展示了Flux.publishNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.publishNext()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:publishNext
[英]Prepare a Mono which shares this Flux sequence and dispatches the first observed item to subscribers in a backpressure-aware manner. This will effectively turn any type of sequence into a hot sequence when the first Subscriber subscribes.
[中]准备一个共享此通量序列的Mono,并以背压感知方式将第一个观察到的项目发送给订阅者。当第一个订户订阅时,这将有效地将任何类型的序列转换为热序列。
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<Long> deleteBy(Query query, Class<?> entityType, String index, String type) {
Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, getPersistentEntity(entityType), index, type).map(BulkByScrollResponse::getDeleted)
.publishNext();
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers) //
.publishNext();
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
return sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
.publishNext();
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
return sendRequest(indexRequest, RequestCreator.index(), IndexResponse.class, headers).publishNext();
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
@Override
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
return sendRequest(updateRequest, RequestCreator.update(), UpdateResponse.class, headers).publishNext();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCancelledByMonoProcessor() {
AtomicLong cancelCounter = new AtomicLong();
Flux.range(1, 10)
.doOnCancel(cancelCounter::incrementAndGet)
.publishNext()
.subscribe();
assertThat(cancelCounter.get()).isEqualTo(1);
}
代码示例来源:origin: reactor/reactor-core
.map(it -> it * 2)
.buffer()
.publishNext();
代码示例来源:origin: reactor/reactor-core
@Test
public void bufferWillAccumulateMultipleListsOfValues() {
//given: "a source and a collected flux"
EmitterProcessor<Integer> numbers = EmitterProcessor.create();
//non overlapping buffers
EmitterProcessor<Integer> boundaryFlux = EmitterProcessor.create();
MonoProcessor<List<List<Integer>>> res = numbers.buffer(boundaryFlux)
.buffer()
.publishNext()
.toProcessor();
res.subscribe();
numbers.onNext(1);
numbers.onNext(2);
numbers.onNext(3);
boundaryFlux.onNext(1);
numbers.onNext(5);
numbers.onNext(6);
numbers.onComplete();
//"the collected lists are available"
assertThat(res.block()).containsExactly(Arrays.asList(1, 2, 3), Arrays.asList(5, 6));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowWillAccumulateMultipleListsOfValues() {
//given: "a source and a collected flux"
EmitterProcessor<Integer> numbers = EmitterProcessor.create();
//non overlapping buffers
EmitterProcessor<Integer> boundaryFlux = EmitterProcessor.create();
MonoProcessor<List<List<Integer>>> res = numbers.window(boundaryFlux)
.concatMap(Flux::buffer)
.buffer()
.publishNext()
.toProcessor();
res.subscribe();
numbers.onNext(1);
numbers.onNext(2);
numbers.onNext(3);
boundaryFlux.onNext(1);
numbers.onNext(5);
numbers.onNext(6);
numbers.onComplete();
//"the collected lists are available"
assertThat(res.block()).containsExactly(Arrays.asList(1, 2, 3), Arrays.asList(5, 6));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void bufferWillAcumulateMultipleListsOfValuesOverlap() {
//given: "a source and a collected flux"
EmitterProcessor<Integer> numbers = EmitterProcessor.create();
EmitterProcessor<Integer> bucketOpening = EmitterProcessor.create();
//"overlapping buffers"
EmitterProcessor<Integer> boundaryFlux = EmitterProcessor.create();
MonoProcessor<List<List<Integer>>> res = numbers.bufferWhen(bucketOpening, u -> boundaryFlux )
.buffer()
.publishNext()
.toProcessor();
res.subscribe();
numbers.onNext(1);
numbers.onNext(2);
bucketOpening.onNext(1);
numbers.onNext(3);
bucketOpening.onNext(1);
numbers.onNext(5);
boundaryFlux.onNext(1);
bucketOpening.onNext(1);
boundaryFlux.onComplete();
numbers.onComplete();
//"the collected overlapping lists are available"
assertThat(res.block()).containsExactly(Arrays.asList(3, 5),
Collections.singletonList(5), Collections.emptyList());
}
代码示例来源:origin: reactor/reactor-core
@Test
public void windowWillAccumulateMultipleListsOfValuesOverlap() {
//given: "a source and a collected flux"
EmitterProcessor<Integer> numbers = EmitterProcessor.create();
EmitterProcessor<Integer> bucketOpening = EmitterProcessor.create();
//"overlapping buffers"
EmitterProcessor<Integer> boundaryFlux = EmitterProcessor.create();
MonoProcessor<List<List<Integer>>> res = numbers.windowWhen(bucketOpening, u -> boundaryFlux )
.flatMap(Flux::buffer)
.buffer()
.publishNext()
.toProcessor();
res.subscribe();
numbers.onNext(1);
numbers.onNext(2);
bucketOpening.onNext(1);
numbers.onNext(3);
bucketOpening.onNext(1);
numbers.onNext(5);
boundaryFlux.onNext(1);
bucketOpening.onNext(1);
boundaryFlux.onComplete();
numbers.onComplete();
//"the collected overlapping lists are available"
assertThat(res.block()).containsExactly(
Arrays.asList(3, 5),
Arrays.asList(5));
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Flux#publishNext()
*/
public final Mono<T> publishNext() {
return boxed.publishNext();
}
/**
内容来源于网络,如有侵权,请联系作者删除!