completablefuture.alloff完成,即使其列表中的一个completablefuture尚未完成

snz8szmq  于 2021-07-06  发布在  Java
关注(0)|答案(2)|浏览(442)

我有两个完整的未来。task2只能在task1完成后启动。然后,我需要等待所有任务完成。在我下面的代码中,程序在task1结束后结束。任务2开始,但没有完成。你知道为什么会这样吗?另外,为什么列表只包含1个条目,而在代码中,我添加了2个条目?
代码:

public void testFutures () throws Exception {
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
      System.out.println(" task1 start");
      try {
        Thread.sleep(5000L);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
      System.out.println(" task1 done");
      return null;

    });

    task1.whenComplete( (x, y) -> {
      CompletableFuture<Void> task2 = CompletableFuture.supplyAsync( () -> {
        System.out.println(" task2 start");
        try {
          Thread.sleep(2000L);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        System.out.println(" task2 done");
        return null;
      });
      futures.add(task2);
    });
    futures.add(task1);
    // wait for the calls to finish
    try {
      CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete( (x, y) -> {
        System.out.println(" all tasks done " + futures.size());
      }).get();

    } catch (Exception e) {
      e.printStackTrace();
    }
  }

输出:

task1 start
 task1 done
 all tasks done 1
 task2 start
2cmtqfgy

2cmtqfgy1#

你有两个问题。
首先,你已经创建了一个关于什么时候比赛的条件 task2 被添加到你的未来列表中。在你执行这行的时候-

CompletableFuture.allOf(...).get();

-我称之为终结者,你只有 task1 在列表中。通过输出其大小自行查看:

// wait for the calls to finish
try {
    System.out.println("# of futures: " + futures.size()); // 1
``` `task2` 最终仍会运行,因为您使用 `whenComplete()` . 但触发它的不是你的终结者。
记得我说过这是比赛条件。要亲自演示这一点,请添加 `sleep()` 在结束吸气剂之前,像这样:

try {
Thread.sleep(6000L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
// wait for the calls to finish
try {
System.out.println("# of futures: " + futures.size()); // 2

那你就有足够的时间 `task2` .
但事情是这样的。现在终止getter是否触发这两个任务?
还是不行!这就是第二个问题:你几乎总是想使用 `thenRun()` ,  `thenAccept()` ,  `thenApply()` ,  `thenCompose()` 方法。这些方法链接您的未来,即使每个阶段依赖于前一个阶段,这样您的终止getter实际上会等待整个链完成。 `whenComplete()` 是一种特殊的方法,它启动一个完全不相关的管道,因此不受终止管道的影响 `get()` .
在你的情况下,你想用 `thenRun()` ,如下所示:

task1.thenRun( ignore -> {

好吧,那我们怎么把这些结合起来呢?

public static void testFutures () throws Exception {

CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
  System.out.println(" task1 start");
  try {
    Thread.sleep(5000L);
  } catch (InterruptedException ex) {
    ex.printStackTrace();
  }
  System.out.println(" task1 done");
  return null;
});

CompletableFuture<Void> futuresChain = task1.thenRun( () -> {
  System.out.println(" task2 start");
  try {
    Thread.sleep(2000L);
  } catch (InterruptedException ex) {
    ex.printStackTrace();
  }
  System.out.println(" task2 done");
});

// wait for the calls to finish
try {
  futuresChain.thenRun( () -> {
    System.out.println(" all tasks done ");
  }).toCompletableFuture().get();
} catch (Exception e) {
  e.printStackTrace();
}

}

输出:

task1 start
task1 done
task2 start
task2 done
all tasks done

你看,你只需要 `supplyAsync()` 第一个任务。你想跑吗 `task2` 在那个任务之后,所以 `thenRun()` 我来安排时间 `supplyAsync()` 为了你。所以你也不需要一系列的未来。这个 `allOf()` 用于您希望并行运行任务并等待所有任务完成的情况。
643ylb08

643ylb082#

让我们先清理代码。
让我们定义一个方法来进行睡眠,这样它就不会弄脏水了:

private static void sleep(int seconds) {
    try {
        Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    }
}

然后,让我们将任务分开并使用适当的方法:

private static CompletableFuture<Void> task1() {

    return CompletableFuture.runAsync(() -> {
        System.out.println(" task1 start");
        sleep(5);
        System.out.println(" task1 done");
    });
}

private static CompletableFuture<Void> task2() {
    return CompletableFuture.runAsync(() -> {
        System.out.println(" task2 start");
        sleep(2);
        System.out.println(" task2 done");
    });
}

你得明白 CompletableFuture 方法已经完全按照您的要求执行了,它们将在上一个阶段结束后运行下一个阶段。您可以使用以下工具使代码更加简单:

public static void main(String[] args) throws Exception {
    testFutures();
}

private static void testFutures() throws Exception {

    CompletableFuture<Void> both = task1().thenCompose(ignoreMe -> task2());
    both.get();
    System.out.println("both done");

}

相关问题