我需要将一些项目从非React性存储库转移到React性存储库(firestore)。
该过程是从netty公开的rest端点触发的。下面的代码是我经过一些尝试和错误后编写的。
非React式回购的查询时间不长(约20秒),但返回大量记录,执行时间通常约60分钟。
所有记录都会被保存,所有的“正在保存…”。。。打印“”,但如果次数超过50%,则不会打印“保存的记录”,也不会打印错误。
我注意到:
更多记录->更高的失败概率
它不依赖于执行时间(有时比失败的进程完成的时间长)
该应用程序运行在k8s吊舱上,ram请求为1500英里,极限为3000英里,从图表上看,它从未接近极限。
我错过了什么?
@Slf4j
@RestController
@RequestMapping("/import")
public class ImportController {
@Autowired
private NotReactiveRepository notReactiveRepository;
@Autowired
private ReactiveRepository reactiveRepository;
private static final Scheduler queryScheduler = Schedulers.newBoundedElastic(1, 480, "query", 864000);// max 10 days processing time
@GetMapping("/start")
public Mono<String> start() {
log.info("Start");
return Mono.just("RECEIVED")
//fire and forget
.doOnNext(stringRouteResponse -> startProcess().subscribe());
}
private Mono<Long> startProcess() {
Mono<List<Items>> resultsBlockingMono = Mono
.fromCallable(() -> notReactiveRepository.findAll())
.subscribeOn(queryScheduler)
.retryWhen(Retry.backoff(5, Duration.of(2, ChronoUnit.SECONDS)));
return resultsBlockingMono
.doOnNext( records -> log.info("Records: {}", records.size()))
.flatMapMany(Flux::fromIterable)
.map(ItemConverter::convert)
// max 9000 save/sec
.delayElements(Duration.of(300, ChronoUnit.MICROS))
.flatMap(this::saveConvertedItem)
.zipWith(Flux.range(1, Integer.MAX_VALUE))
.doOnNext(savedAndIndex -> log.info("Saving in progress... {}", savedAndIndex.getT2()))
.count()
.doOnNext( numberOfSaved -> log.info("Saved {} records", numberOfSaved));
}
private Mono<ConvertedItem> saveConvertedItem(ConvertedItem convertedItem) {
return reactiveRepository.save(convertedItem)
.retryWhen(Retry.backoff(1000, Duration.of(2, ChronoUnit.MILLIS)))
.onErrorResume(throwable -> {
log.error("Resuming");
return Mono.empty();
})
.doOnError(throwable -> log.error("Error on save"));
}
}
更新:根据请求,这是程序的最后一个输出,其中应该是“保存1131113条记录”,并带有 .log()
之前 .count()
(onnext之后的输出总是在进程之后打印,也在成功时打印):
"Saving... 1131113"
"| onNext([ConvertedItem(...),1131113])"
"Shutting down ExecutorService 'pubsubPublisherThreadPool'"
"Shutting down ExecutorService 'pubSubAcknowledgementExecutor'"
"Shutting down ExecutorService 'pubsubSubscriberThreadPool'"
"Closing JPA EntityManagerFactory for persistence unit 'default'"
"HikariPool-1 - Shutdown initiated..."
"HikariPool-1 - Shutdown completed."
暂无答案!
目前还没有任何答案,快来回答吧!