本文整理了Java中reactor.core.publisher.Flux.thenMany()
方法的一些代码示例,展示了Flux.thenMany()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.thenMany()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:thenMany
[英]Let this Flux complete then play another Publisher.
In other words ignore element from this flux and transform the completion signal into a Publisher that will emit elements from the provided Publisher.
[中]让此流量完成,然后播放另一个发布者。
换句话说,忽略此流量中的元素,并将完成信号转换为发布服务器,该发布服务器将从提供的发布服务器发出元素。
代码示例来源:origin: reactor/reactor-core
@Test
public void thenManyThenMany(){
StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.just("test", "test2"))
.thenMany(Flux.just(1L, 2L)))
.expectNext(1L, 2L)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void thenManyError(){
StepVerifier.create(Flux.error(new Exception("test")).thenMany(Flux.just(4, 5, 6)))
.verifyErrorMessage("test");
}
代码示例来源:origin: spring-projects/spring-data-mongodb
protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
MongoWriter<Object> writer) {
Assert.notNull(writer, "MongoWriter must not be null!");
Mono<List<Tuple2<AdaptibleEntity<T>, Document>>> prepareDocuments = Flux.fromIterable(batchToSave)
.map(uninitialized -> {
BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
T toConvert = maybeEmitEvent(event).getSource();
AdaptibleEntity<T> entity = operations.forEntity(toConvert, mongoConverter.getConversionService());
entity.assertUpdateableIdIfNotSet();
T initialized = entity.initializeVersionProperty();
Document dbDoc = entity.toMappedDocument(writer).getDocument();
maybeEmitEvent(new BeforeSaveEvent<>(initialized, dbDoc, collectionName));
return Tuples.of(entity, dbDoc);
}).collectList();
Flux<Tuple2<AdaptibleEntity<T>, Document>> insertDocuments = prepareDocuments.flatMapMany(tuples -> {
List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
});
return insertDocuments.map(tuple -> {
Object id = MappedDocument.of(tuple.getT2()).getId();
T saved = tuple.getT1().populateIdIfNecessary(id);
maybeEmitEvent(new AfterSaveEvent<>(saved, tuple.getT2(), collectionName));
return saved;
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManyFusion() throws InterruptedException {
Flux<Integer> test = Flux.just("A", "B")
.thenMany(Flux.just("C", "D"))
.thenMany(Flux.just(1, 2));
Assert.assertTrue(test instanceof FluxConcatArray);
FluxConcatArray<Integer> s = (FluxConcatArray<Integer>)test;
Assert.assertTrue(s.array.length == 3);
AssertSubscriber<Integer> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues(1, 2);
ts.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void thenMany(){
StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.just("test", "test2")))
.expectNext("test", "test2")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManyDifferentType() throws InterruptedException {
Flux<String> test = Flux.just(1, 2).thenMany(Flux.just("C", "D"));
AssertSubscriber<String> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues("C", "D");
ts.assertComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManySameType() throws InterruptedException {
Flux<String> test = Flux.just("A", "B")
.thenMany(Flux.just("C", "D"));
AssertSubscriber<String> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues("C", "D");
ts.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void thenManySupplier(){
StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.defer(() -> Flux.just("test", "test2"))))
.expectNext("test", "test2")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testThenManyFusion() {
Flux<Integer> test = Mono.just("A")
.thenMany(Flux.just("C", "D"))
.thenMany(Flux.just(1, 2));
Assert.assertTrue(test instanceof FluxConcatArray);
FluxConcatArray<Integer> s = (FluxConcatArray<Integer>) test;
Assert.assertTrue(s.array.length == 3);
AssertSubscriber<Integer> ts = AssertSubscriber.create();
test.subscribe(ts);
ts.assertValues(1, 2);
ts.assertComplete();
}
代码示例来源:origin: org.springframework.data/spring-data-mongodb
protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
MongoWriter<Object> writer) {
Assert.notNull(writer, "MongoWriter must not be null!");
Mono<List<Tuple2<AdaptibleEntity<T>, Document>>> prepareDocuments = Flux.fromIterable(batchToSave)
.map(uninitialized -> {
BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
T toConvert = maybeEmitEvent(event).getSource();
AdaptibleEntity<T> entity = operations.forEntity(toConvert, mongoConverter.getConversionService());
entity.assertUpdateableIdIfNotSet();
T initialized = entity.initializeVersionProperty();
Document dbDoc = entity.toMappedDocument(writer).getDocument();
maybeEmitEvent(new BeforeSaveEvent<>(initialized, dbDoc, collectionName));
return Tuples.of(entity, dbDoc);
}).collectList();
Flux<Tuple2<AdaptibleEntity<T>, Document>> insertDocuments = prepareDocuments.flatMapMany(tuples -> {
List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
});
return insertDocuments.map(tuple -> {
Object id = MappedDocument.of(tuple.getT2()).getId();
T saved = tuple.getT1().populateIdIfNecessary(id);
maybeEmitEvent(new AfterSaveEvent<>(saved, tuple.getT2(), collectionName));
return saved;
});
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param afterSupplier
* @return
* @see reactor.core.publisher.Flux#thenMany(java.util.function.Supplier)
*/
public final <V> Flux<V> thenMany(Supplier<? extends Publisher<V>> afterSupplier) {
return boxed.thenMany(afterSupplier);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param other
* @return
* @see reactor.core.publisher.Flux#thenMany(org.reactivestreams.Publisher)
*/
public final <V> Flux<V> thenMany(Publisher<V> other) {
return boxed.thenMany(other);
}
/**
代码示例来源:origin: joshlong/reactive-spring-article
@Override
public void run(ApplicationArguments args) throws Exception {
// @formatter:off
this.bookRepository
.deleteAll()
.thenMany(
Flux.just("Professional Java Development with the Spring Framework|rjohnson",
"Cloud Native Java|jlong", "Spring Security 3.1|rwinch", "Spring in Action|cwalls"))
.map(t -> t.split("\\|"))
.map(tuple -> new Book(null, tuple[0], tuple[1]))
.flatMap(this.bookRepository::save)
.thenMany(this.bookRepository.findAll())
.subscribe(book -> log.info(book.toString()));
// @formatter:on
}
}
代码示例来源:origin: LuoLiangDSGA/spring-learning
@PostConstruct
public void loadData() {
factory.getReactiveConnection().serverCommands().flushAll()
.thenMany(Flux.just("Thor", "Hulk", "Tony")
.map(name -> new User(UUID.randomUUID().toString().substring(0, 5), name, "123456"))
.flatMap(user -> redisOperations.opsForValue().set(user.getId(), user))
).thenMany(redisOperations.keys("*")
.flatMap(redisOperations.opsForValue()::get))
.subscribe(System.out::println);
}
}
代码示例来源:origin: hantsy/spring-reactive-sample
@EventListener(value = ContextRefreshedEvent.class)
public void init() {
log.info("start data initialization ...");
this.databaseClient.insert()
.into("posts")
//.nullValue("id", Integer.class)
.value("title", "First post title")
.value("content", "Content of my first post")
.map((r, m) -> r.get("id", Integer.class)).all()
.log()
.thenMany(
this.databaseClient.select()
.from("posts")
.orderBy(Sort.by(desc("id")))
.as(Post.class)
.fetch()
.all()
.log()
)
.subscribe(null, null, () -> log.info("initialization is done..."));
}
代码示例来源:origin: hantsy/spring-reactive-sample
@EventListener(value = ContextRefreshedEvent.class)
public void init() {
log.info("start data initialization ...");
this.databaseClient.insert()
.into("posts")
//.nullValue("id", Integer.class)
.value("title", "First post title")
.value("content", "Content of my first post")
.map((r, m) -> r.get("id", Integer.class))
.all()
.log()
.thenMany(
this.databaseClient.select()
.from("posts")
.orderBy(Sort.by(desc("id")))
.as(Post.class)
.fetch()
.all()
.log()
)
.subscribe(null, null, () -> log.info("initialization is done..."));
}
代码示例来源:origin: elder-oss/sourcerer
@Test
@Ignore // Manual only
public void testRetriesOnStreamError() {
EventRepository<String> repository = mock(EventRepository.class);
EventSubscriptionPositionSource positionSource =
mock(EventSubscriptionPositionSource.class);
when(repository.getPublisher(any())).then(position -> {
WorkQueueProcessor<EventRecord<String>> processor = WorkQueueProcessor.create();
Flux<EventRecord<String>> eventSource = Flux
.fromStream(IntStream.range(0, 1000000).mapToObj(this::wrapIntAsEvent))
.doOnNext(e -> {
lastProducedValue = e.getEvent();
});
eventSource.subscribe(processor);
return processor.take(100).thenMany(Flux.error(new RuntimeException("fail!")));
});
when(positionSource.getSubscriptionPosition()).thenReturn(null);
SlowSubscriptionHandler<String> subscriptionHandler =
new SlowSubscriptionHandler<>();
EventSubscriptionManager subscriptionManager = new EventSubscriptionManager<>(
repository,
positionSource,
subscriptionHandler,
new SubscriptionWorkerConfig().withBatchSize(64));
SubscriptionToken token = subscriptionManager.start();
sleep(100000);
token.stop();
}
代码示例来源:origin: joshlong/reactive-spring-online-training
@Override
public void run(ApplicationArguments args) throws Exception {
Author viktor = new Author("viktorklang");
Author jboner = new Author("jboner");
Author josh = new Author("starbuxman");
Flux<Tweet> tweets = Flux.just(
new Tweet(viktor, "woot, @Konrad will be talking #enterprise #integration done right!"),
new Tweet(viktor, "#scala implicits can easily be used to model capabilities, but can they encode obligations easily?"),
new Tweet(viktor, "this is so cool! #akka"),
new Tweet(jboner, "cross data center replication of event sourced #akka actors is soon available (using #crdts and more!)"),
new Tweet(josh, "a reminder: @SpringBoot lets you pair program with the #Spring team. #bootiful"),
new Tweet(josh, "whatever your next platform is, don't build it yourself.\n\n" +
"Even companies with the $$ and the motivation to do it fail. A LOT. #bootiful")
);
this
.tweetRepository
.deleteAll()
.thenMany(tweets.flatMap(tweetRepository::save))
.thenMany(tweetRepository.findAll())
// .subscribeOn(Schedulers.fromExecutor(Executors.newSingleThreadExecutor()))
.subscribe(System.out::println);
}
}
代码示例来源:origin: danielfernandez/reactive-matchday
.concatMap(match -> mongoTemplate.insert(new MatchEvent(match.getId(), MatchEvent.Type.MATCH_START, null, null)))
.thenMany(Flux.fromIterable(Data.PLAYERS))
.flatMap(mongoTemplate::insert)
.log(LOGGER_INITIALIZE, Level.FINEST)
代码示例来源:origin: reactor/reactor-netty
in.receive()
.take(1)
.thenMany(Flux.defer(() ->
out.withConnection(c ->
c.addHandlerFirst(new HttpResponseEncoder()))
内容来源于网络,如有侵权,请联系作者删除!