java Postgres r2dbc批量插入问题

osh3o9ms  于 2023-03-21  发布在  Java
关注(0)|答案(1)|浏览(165)

我正在尝试将文章的Flux-stream批量持久化到Postgres r2 dbc

应用程序有两个相同的article表(article_origin和article_destiny)。origin表有一百万篇文章,我试图将其批量插入到空的article_destiny表中!Web客户端调用article端点并请求单个Flux流。应用程序在没有批量插入的情况下工作,但需要超过20分钟才能逐个持久化所有文章。
文章到达articleDestinyBatchStream,缓冲区(以back-pressure作为参数)生成批量大小的块。
但是**.map(connection -〉{**被跳过了,原因我不明白!
请帮帮忙
接下来找到相关的代码段:

Web客户端

public void getArticleBatchStreamToOrigin(Long backPressure) {
try {
    client.get()
            .uri("/article-batch-stream-from-origin/" + backPressure)
            .accept(MediaType.TEXT_EVENT_STREAM) //APPLICATION_NDJSON TEXT_EVENT_STREAM
            .retrieve()
            .bodyToFlux(ArticleDestiny.class)
            .buffer(backPressure.intValue())
            .subscribe(articles -> {
                streamService.articleDestinyBatchStream(articles);
                log.info("at client article batch size={} ", articles.size());
            });
} catch (WebClientResponseException wcre) {
    log.error("Error Response Code is {} and Response Body is {}", wcre.getRawStatusCode(), wcre.getResponseBodyAsString());
    log.error("Exception in method getArticleStream()", wcre);
    throw wcre;
} catch (Exception ex) {
    log.error("Exception in method getArticleStream()", ex);
    throw ex;
}

}

客户端调用的文章流媒体服务(百万篇)

public Flux<ArticleOrigin> getAllArticleStreamFromOrigin(Long backPressure) {
return articleOriginRepository
        .findAll()
        .doOnNext(a -> log.debug("at stream service article id={}", a.getId()));

}

文章批次持久化:

public void articleDestinyBatchStream(List<ArticleDestiny> articles) {
log.info("connectionFactory {}", connectionFactory.getMetadata().getName());
Mono.from(connectionFactory.create())
      .map(connection -> { // this part is skipped, hmmmm!
      Batch batch = connection.createBatch();
             batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('DE', '123', 'some name', '23.23')");
             batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('BE', '456', 'some other name', '44.44')");
             batch.add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('NL', '789', 'some new name', '55.55')");

         return batch.execute();
      });

}

neekobn8

neekobn81#

React链完成后,由于没有用户,因此跳过。
试试下面的代码片段。

public void articleDestinyBatchStream(List<String> articles) {
        log.info("connectionFactory {}", connectionFactory.getMetadata().getName());

        Mono.from(connectionFactory.create())
                .flatMapMany(connection -> Flux.from(connection.createBatch()
                        .add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('DE', '123', 'some name', '23.23')")
                        .add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('BE', '456', 'some other name', '44.44')")
                        .add("INSERT INTO article_destiny (country_code, art_number, name, price) values ('NL', '789', 'some new name', '55.55')")
                        .execute()))
                .then();

    }

then()语句丢弃任何返回值,并在所有操作完成时向调用客户端发出信号。

相关问题