java CompletableFuture未等待子线程

mcvgt66p  于 2023-01-24  发布在  Java
关注(0)|答案(1)|浏览(180)

我尝试等待processor.processFiles()完成,该方法返回void,并且它是一个@Async方法。忙碌等待逻辑不会导致进程等待方法完成。有人能指出我遗漏了什么吗?

try{

    filesList.forEach(files -> {
        List<CompletableFuture<Void>> completableFutures  = new ArrayList<>();

        files.forEach(file-> {
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> 
                processor.processFiles());
            completableFutures.add(completableFuture);
        });
        while(true) {

            Thread.sleep(5000);
            boolean isComplete = completableFutures.stream().allMatch(result -> result.isDone() == true);

            if(isComplete){
                break;
            }
            LOGGER.info("processing the file...");
        }
    });
} 
catch(Exception e){

}
finally{
    closeConnections();
}
wlzqhblo

wlzqhblo1#

我觉得你把事情想得太复杂了。

fileList.flatMap(List::stream).parallel().forEach(file -> processor.processFiles());

forEach将并行运行,并在处理完所有文件后返回。
至少,不要使用副作用来填充List

List<CompletableFuture<Void>> completableFutures  = files.stream().map(
    file ->  CompletableFuture.runAsync(() -> processor.processFiles());
).collect( Collectors.toList());

我同意这个评论。

CompletableFuture<Void> all = CompletableFuture.allOf( completableFutures );

然后可以使用get,它将一直等到任务完成。
另一种方法是跳过List + CompletableFuture.allOf,只返回一个可完成的future。

CompletableFuture<Void> all = files.stream().map(
        file ->  CompletableFuture.runAsync(
            () -> processor.processFiles()
        )
    ).collect( 
        Collectors.reducing( 
           CompletableFuture.completedFuture(null), CompletableFuture::allOf
        )
    );

这将把fileMap到一个CompletableFuture,然后把所有得到的可完成future合并成一个单独的可完成future,你可以调用.get,它会在所有事情完成后返回。

相关问题