java Spring webflux -一个接一个地运行作业

nx7onnlm  于 2023-03-21  发布在  Java
关注(0)|答案(1)|浏览(158)

我正在使用Spring webflux并运行多个作业。每个作业将连接到一个分析DB并运行一个查询,可能需要大约30分钟。我需要运行200个这样的作业,我只能有10个连接到DB。所以,使用下面的方法,获得“HikariPool-2 - Connection is not available,request timed out after 30005 ms”,因为10个连接已经在使用中,队列中的其他查询正在超时。处理此问题的替代方法是什么?

private Mono<String> runJobs(final ReconDto req) {
return jobRepo.fetchJobs().collectList()
                               .zipWhen((jobs) -> Flux.fromIterable(jobs)
                                                        .flatMap((job) -> runJob( job)).collectList())
                               .flatMap(response -> {
                                   return Mono.just(req.getJobName());
                               });
}

private Mono<String> runJob(final JobMeta job) {
  // Here connecting to an analytical DB and running a query which might take 30 mins or so.
}
fjaof16o

fjaof16o1#

您可以使用flatMapconcurrency参数来限制动态内部发布者的最大数量。

Mono<String> runJobs(final ReconDto req) {
    return jobRepo.fetchJobs()
            .flatMap(job -> runJob(job), 10)
            .then(Mono.just(req.getJobName()));
}

在非React性术语中,这是具有10个线程的ThreadPoolExecutor的模拟。

相关问题