长流量有时不完整

5anewei6  于 2021-06-29  发布在  Java
关注(0)|答案(0)|浏览(260)

我需要将一些项目从非React性存储库转移到React性存储库(firestore)。
该过程是从netty公开的rest端点触发的。下面的代码是我经过一些尝试和错误后编写的。
非React式回购的查询时间不长(约20秒),但返回大量记录,执行时间通常约60分钟。
所有记录都会被保存,所有的“正在保存…”。。。打印“”,但如果次数超过50%,则不会打印“保存的记录”,也不会打印错误。
我注意到:
更多记录->更高的失败概率
它不依赖于执行时间(有时比失败的进程完成的时间长)
该应用程序运行在k8s吊舱上,ram请求为1500英里,极限为3000英里,从图表上看,它从未接近极限。
我错过了什么?

@Slf4j
@RestController
@RequestMapping("/import")
public class ImportController {

    @Autowired
    private NotReactiveRepository notReactiveRepository;
    @Autowired
    private ReactiveRepository reactiveRepository;

    private static final Scheduler queryScheduler = Schedulers.newBoundedElastic(1, 480, "query", 864000);// max 10 days processing time

    @GetMapping("/start")
    public Mono<String> start() {
        log.info("Start");
        return Mono.just("RECEIVED")
                //fire and forget
                .doOnNext(stringRouteResponse -> startProcess().subscribe());
    }

    private Mono<Long> startProcess() {
        Mono<List<Items>> resultsBlockingMono = Mono
                .fromCallable(() -> notReactiveRepository.findAll())
                .subscribeOn(queryScheduler)
                .retryWhen(Retry.backoff(5, Duration.of(2, ChronoUnit.SECONDS)));
        return resultsBlockingMono
                .doOnNext( records -> log.info("Records: {}", records.size()))
                .flatMapMany(Flux::fromIterable)
                .map(ItemConverter::convert)
                // max 9000 save/sec
                .delayElements(Duration.of(300, ChronoUnit.MICROS))
                .flatMap(this::saveConvertedItem)
                .zipWith(Flux.range(1, Integer.MAX_VALUE))
                .doOnNext(savedAndIndex -> log.info("Saving in progress... {}", savedAndIndex.getT2()))
                .count()
                .doOnNext( numberOfSaved -> log.info("Saved {} records", numberOfSaved));
    }

    private Mono<ConvertedItem> saveConvertedItem(ConvertedItem convertedItem) {
        return reactiveRepository.save(convertedItem)
                .retryWhen(Retry.backoff(1000, Duration.of(2, ChronoUnit.MILLIS)))
                .onErrorResume(throwable -> {
                    log.error("Resuming");
                    return Mono.empty();
                })
                .doOnError(throwable -> log.error("Error on save"));
    }
}

更新:根据请求,这是程序的最后一个输出,其中应该是“保存1131113条记录”,并带有 .log() 之前 .count() (onnext之后的输出总是在进程之后打印,也在成功时打印):

"Saving... 1131113"
"| onNext([ConvertedItem(...),1131113])"
"Shutting down ExecutorService 'pubsubPublisherThreadPool'"
"Shutting down ExecutorService 'pubSubAcknowledgementExecutor'"
"Shutting down ExecutorService 'pubsubSubscriberThreadPool'"
"Closing JPA EntityManagerFactory for persistence unit 'default'"
"HikariPool-1 - Shutdown initiated..."
"HikariPool-1 - Shutdown completed."

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题