reactor.core.publisher.Flux.thenMany()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(303)

本文整理了Java中reactor.core.publisher.Flux.thenMany()方法的一些代码示例,展示了Flux.thenMany()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.thenMany()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:thenMany

Flux.thenMany介绍

[英]Let this Flux complete then play another Publisher.

In other words ignore element from this flux and transform the completion signal into a Publisher that will emit elements from the provided Publisher.
[中]让此流量完成,然后播放另一个发布者。
换句话说,忽略此流量中的元素,并将完成信号转换为发布服务器,该发布服务器将从提供的发布服务器发出元素。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void thenManyThenMany(){
  StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.just("test", "test2"))
              .thenMany(Flux.just(1L, 2L)))
        .expectNext(1L, 2L)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void thenManyError(){
  StepVerifier.create(Flux.error(new Exception("test")).thenMany(Flux.just(4, 5, 6)))
        .verifyErrorMessage("test");
}

代码示例来源:origin: spring-projects/spring-data-mongodb

protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
    MongoWriter<Object> writer) {
  Assert.notNull(writer, "MongoWriter must not be null!");
  Mono<List<Tuple2<AdaptibleEntity<T>, Document>>> prepareDocuments = Flux.fromIterable(batchToSave)
      .map(uninitialized -> {
        BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
        T toConvert = maybeEmitEvent(event).getSource();
        AdaptibleEntity<T> entity = operations.forEntity(toConvert, mongoConverter.getConversionService());
        entity.assertUpdateableIdIfNotSet();
        T initialized = entity.initializeVersionProperty();
        Document dbDoc = entity.toMappedDocument(writer).getDocument();
        maybeEmitEvent(new BeforeSaveEvent<>(initialized, dbDoc, collectionName));
        return Tuples.of(entity, dbDoc);
      }).collectList();
  Flux<Tuple2<AdaptibleEntity<T>, Document>> insertDocuments = prepareDocuments.flatMapMany(tuples -> {
    List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());
    return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
  });
  return insertDocuments.map(tuple -> {
    Object id = MappedDocument.of(tuple.getT2()).getId();
    T saved = tuple.getT1().populateIdIfNecessary(id);
    maybeEmitEvent(new AfterSaveEvent<>(saved, tuple.getT2(), collectionName));
    return saved;
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testThenManyFusion() throws InterruptedException {
  Flux<Integer> test = Flux.just("A", "B")
              .thenMany(Flux.just("C", "D"))
              .thenMany(Flux.just(1, 2));
  Assert.assertTrue(test instanceof FluxConcatArray);
  FluxConcatArray<Integer> s = (FluxConcatArray<Integer>)test;
  Assert.assertTrue(s.array.length == 3);
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  test.subscribe(ts);
  ts.assertValues(1, 2);
  ts.assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void thenMany(){
  StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.just("test", "test2")))
        .expectNext("test", "test2")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void testThenManyDifferentType() throws InterruptedException {
    Flux<String> test = Flux.just(1, 2).thenMany(Flux.just("C", "D"));

    AssertSubscriber<String> ts = AssertSubscriber.create();
    test.subscribe(ts);
    ts.assertValues("C", "D");
    ts.assertComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testThenManySameType() throws InterruptedException {
  Flux<String> test = Flux.just("A", "B")
              .thenMany(Flux.just("C", "D"));
  AssertSubscriber<String> ts = AssertSubscriber.create();
  test.subscribe(ts);
  ts.assertValues("C", "D");
  ts.assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void thenManySupplier(){
  StepVerifier.create(Flux.just(1, 2, 3).thenMany(Flux.defer(() -> Flux.just("test", "test2"))))
        .expectNext("test", "test2")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testThenManyFusion() {
  Flux<Integer> test = Mono.just("A")
               .thenMany(Flux.just("C", "D"))
               .thenMany(Flux.just(1, 2));
  Assert.assertTrue(test instanceof FluxConcatArray);
  FluxConcatArray<Integer> s = (FluxConcatArray<Integer>) test;
  Assert.assertTrue(s.array.length == 3);
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  test.subscribe(ts);
  ts.assertValues(1, 2);
  ts.assertComplete();
}

代码示例来源:origin: org.springframework.data/spring-data-mongodb

protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
    MongoWriter<Object> writer) {
  Assert.notNull(writer, "MongoWriter must not be null!");
  Mono<List<Tuple2<AdaptibleEntity<T>, Document>>> prepareDocuments = Flux.fromIterable(batchToSave)
      .map(uninitialized -> {
        BeforeConvertEvent<T> event = new BeforeConvertEvent<>(uninitialized, collectionName);
        T toConvert = maybeEmitEvent(event).getSource();
        AdaptibleEntity<T> entity = operations.forEntity(toConvert, mongoConverter.getConversionService());
        entity.assertUpdateableIdIfNotSet();
        T initialized = entity.initializeVersionProperty();
        Document dbDoc = entity.toMappedDocument(writer).getDocument();
        maybeEmitEvent(new BeforeSaveEvent<>(initialized, dbDoc, collectionName));
        return Tuples.of(entity, dbDoc);
      }).collectList();
  Flux<Tuple2<AdaptibleEntity<T>, Document>> insertDocuments = prepareDocuments.flatMapMany(tuples -> {
    List<Document> documents = tuples.stream().map(Tuple2::getT2).collect(Collectors.toList());
    return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
  });
  return insertDocuments.map(tuple -> {
    Object id = MappedDocument.of(tuple.getT2()).getId();
    T saved = tuple.getT1().populateIdIfNecessary(id);
    maybeEmitEvent(new AfterSaveEvent<>(saved, tuple.getT2(), collectionName));
    return saved;
  });
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param afterSupplier
 * @return
 * @see reactor.core.publisher.Flux#thenMany(java.util.function.Supplier)
 */
public final <V> Flux<V> thenMany(Supplier<? extends Publisher<V>> afterSupplier) {
  return boxed.thenMany(afterSupplier);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param other
 * @return
 * @see reactor.core.publisher.Flux#thenMany(org.reactivestreams.Publisher)
 */
public final <V> Flux<V> thenMany(Publisher<V> other) {
  return boxed.thenMany(other);
}
/**

代码示例来源:origin: joshlong/reactive-spring-article

@Override
  public void run(ApplicationArguments args) throws Exception {

    // @formatter:off
    this.bookRepository
      .deleteAll()
      .thenMany(
       Flux.just("Professional Java Development with the Spring Framework|rjohnson",
        "Cloud Native Java|jlong", "Spring Security 3.1|rwinch", "Spring in Action|cwalls"))
      .map(t -> t.split("\\|"))
      .map(tuple -> new Book(null, tuple[0], tuple[1]))
      .flatMap(this.bookRepository::save)
      .thenMany(this.bookRepository.findAll())
      .subscribe(book -> log.info(book.toString()));
    // @formatter:on
  }
}

代码示例来源:origin: LuoLiangDSGA/spring-learning

@PostConstruct
  public void loadData() {
    factory.getReactiveConnection().serverCommands().flushAll()
        .thenMany(Flux.just("Thor", "Hulk", "Tony")
            .map(name -> new User(UUID.randomUUID().toString().substring(0, 5), name, "123456"))
            .flatMap(user -> redisOperations.opsForValue().set(user.getId(), user))
        ).thenMany(redisOperations.keys("*")
        .flatMap(redisOperations.opsForValue()::get))
        .subscribe(System.out::println);
  }
}

代码示例来源:origin: hantsy/spring-reactive-sample

@EventListener(value = ContextRefreshedEvent.class)
public void init() {
  log.info("start data initialization  ...");
  this.databaseClient.insert()
    .into("posts")
    //.nullValue("id", Integer.class)
    .value("title", "First post title")
    .value("content", "Content of my first post")
    .map((r, m) -> r.get("id", Integer.class)).all()
    .log()
    .thenMany(
      this.databaseClient.select()
        .from("posts")
        .orderBy(Sort.by(desc("id")))
        .as(Post.class)
        .fetch()
        .all()
        .log()
    )
    .subscribe(null, null, () -> log.info("initialization is done..."));
}

代码示例来源:origin: hantsy/spring-reactive-sample

@EventListener(value = ContextRefreshedEvent.class)
public void init() {
  log.info("start data initialization  ...");
  this.databaseClient.insert()
    .into("posts")
    //.nullValue("id", Integer.class)
    .value("title", "First post title")
    .value("content", "Content of my first post")
    .map((r, m) -> r.get("id", Integer.class))
    .all()
    .log()
    .thenMany(
      this.databaseClient.select()
        .from("posts")
        .orderBy(Sort.by(desc("id")))
        .as(Post.class)
        .fetch()
        .all()
        .log()
    )
    .subscribe(null, null, () -> log.info("initialization is done..."));
}

代码示例来源:origin: elder-oss/sourcerer

@Test
@Ignore // Manual only
public void testRetriesOnStreamError() {
  EventRepository<String> repository = mock(EventRepository.class);
  EventSubscriptionPositionSource positionSource =
      mock(EventSubscriptionPositionSource.class);
  when(repository.getPublisher(any())).then(position -> {
    WorkQueueProcessor<EventRecord<String>> processor = WorkQueueProcessor.create();
    Flux<EventRecord<String>> eventSource = Flux
        .fromStream(IntStream.range(0, 1000000).mapToObj(this::wrapIntAsEvent))
        .doOnNext(e -> {
          lastProducedValue = e.getEvent();
        });
    eventSource.subscribe(processor);
    return processor.take(100).thenMany(Flux.error(new RuntimeException("fail!")));
  });
  when(positionSource.getSubscriptionPosition()).thenReturn(null);
  SlowSubscriptionHandler<String> subscriptionHandler =
      new SlowSubscriptionHandler<>();
  EventSubscriptionManager subscriptionManager = new EventSubscriptionManager<>(
      repository,
      positionSource,
      subscriptionHandler,
      new SubscriptionWorkerConfig().withBatchSize(64));
  SubscriptionToken token = subscriptionManager.start();
  sleep(100000);
  token.stop();
}

代码示例来源:origin: joshlong/reactive-spring-online-training

@Override
    public void run(ApplicationArguments args) throws Exception {

        Author viktor = new Author("viktorklang");
        Author jboner = new Author("jboner");
        Author josh = new Author("starbuxman");

        Flux<Tweet> tweets = Flux.just(
          new Tweet(viktor, "woot, @Konrad will be talking #enterprise #integration done right!"),
          new Tweet(viktor, "#scala implicits can easily be used to model capabilities, but can they encode obligations easily?"),
          new Tweet(viktor, "this is so cool! #akka"),
          new Tweet(jboner, "cross data center replication of event sourced #akka actors is soon available (using #crdts and more!)"),
          new Tweet(josh, "a reminder: @SpringBoot lets you pair program with the #Spring team. #bootiful"),
          new Tweet(josh, "whatever your next platform is, don't build it yourself.\n\n" +
            "Even companies with the $$ and the motivation to do it fail. A LOT. #bootiful")
        );
        this
          .tweetRepository
          .deleteAll()
          .thenMany(tweets.flatMap(tweetRepository::save))
          .thenMany(tweetRepository.findAll())
//                    .subscribeOn(Schedulers.fromExecutor(Executors.newSingleThreadExecutor()))
          .subscribe(System.out::println);
    }
}

代码示例来源:origin: danielfernandez/reactive-matchday

.concatMap(match -> mongoTemplate.insert(new MatchEvent(match.getId(), MatchEvent.Type.MATCH_START, null, null)))
.thenMany(Flux.fromIterable(Data.PLAYERS))
.flatMap(mongoTemplate::insert)
.log(LOGGER_INITIALIZE, Level.FINEST)

代码示例来源:origin: reactor/reactor-netty

in.receive()
 .take(1)
 .thenMany(Flux.defer(() ->
     out.withConnection(c ->
         c.addHandlerFirst(new HttpResponseEncoder()))

相关文章

Flux类方法