通过 CompletableFuture 控制线程间依赖关系

x33g5p2x  于2022-05-11 转载在 其他  
字(3.6k)|赞(0)|评价(0)|浏览(224)

一 点睛

Future 接口可以创建出新的线程,并在新线程中执行异步操作。但是, 如果使用 Future 接口创建了多个线程,那么这些线程各自独立执行,不存在任务依赖关系,并且无法控制各个线程的执行步骤。

为了解决这种线程间的依赖关系,从 JDK 1.8 开始,Future 接口提供了一个新的实现类 CompletableFuture。CompletableFuture 不但可以创建出异步执行的线程,还可以控制线程的执行步骤,并且可以监控所有线程的结束时刻。例如,可以使用 CompletableFuture 创建 A 和 B 两个线程,然后规定 A 和 B 线程各需要执行 3个 不同的阶段,并且当 A和 B 全部执行完毕后,再触发某个方法。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                               Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }
}

其中,supplyAsync() 用于有返回值的任务,runAsync() 用于没有返回值的任务,

除了这些用于执行线程任务的方法外,CompletableFuture 还提供了多线程执行时的两种结束逻辑:allOf() 和 anyOf(),如下所述。
1 allOf() 方法会一直阻塞,直到线程池 Executor 中所有线程全部执行完毕。

2 anyOf() 方法会一直阻塞,直到线程池 Executor 中任何一个线程执行完毕。

二 实战

1 需求

有1、2、3、4 四个数字,现要求创建4个线程对这些数字进行处理,并要求每个线程必须按照以下步骤执行。

a 对4个数字的值各加上64,使之转为 A、B、C、D 对应的 ASCII(A:65、B:66、C:67、D:68)。

b 将 ASCII 值转为对应的 A、B、C、D 字符。

c 将转后的 A、B、C、D 加入一个集合中。

最后,还要求当 A、B、C、D 4个线程全部执行完毕后,打印出转换后的结果集。

2 代码

package concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 原始数据集
        CopyOnWriteArrayList<Integer> taskList = new CopyOnWriteArrayList();
        taskList.add(1);
        taskList.add(2);
        taskList.add(3);
        taskList.add(4);

        // 结果集
        List<Character> resultList = new ArrayList<>();
        //线程池,可容纳四个线程
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        CompletableFuture[] cfs = taskList.stream()
                // 第一阶段
                .map(integer -> CompletableFuture.supplyAsync(
                        () -> calcASCII(integer), executorService)
                        // 第二阶段
                        .thenApply(i -> {
                            char c = (char) (i.intValue());
                            System.out.println("【阶段2】线程"
                                    + Thread.currentThread().getName() + "执行完毕,"
                                    + "已将int"
                                    + i + "转为了字符" + c);
                            return c;
                        })
                        // 第三阶段
                        .whenComplete((ch, e) -> {
                            resultList.add(ch);
                            System.out.println("【阶段3】线程" +
                                    Thread.currentThread().getName() + "执行完毕," + "已将"
                                    + ch + "增加到了结果集" + resultList + "中");
                            executorService.shutdown();
                        })
                ).toArray(CompletableFuture[]::new);
        // 封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();//future.get()

        System.out.println("完成!result=" + resultList);
    }

    // 计算i的ASCII值
    public static Integer calcASCII(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            //数字 -> A-D对应的ascii
            i = i + 64;
            System.out.println("【阶段1】线程" + Thread.currentThread().getName()
                    + "执行完毕," + "已将" + i
                    + "转为了A(或B或C或D)对应的ASCII" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}

3 测试结果

【阶段1】线程pool-1-thread-3执行完毕,已将67转为了A(或B或C或D)对应的ASCII67

【阶段1】线程pool-1-thread-4执行完毕,已将68转为了A(或B或C或D)对应的ASCII68

【阶段1】线程pool-1-thread-2执行完毕,已将66转为了A(或B或C或D)对应的ASCII66

【阶段2】线程pool-1-thread-3执行完毕,已将int67转为了字符C

【阶段2】线程pool-1-thread-4执行完毕,已将int68转为了字符D

【阶段2】线程pool-1-thread-2执行完毕,已将int66转为了字符B

【阶段3】线程pool-1-thread-3执行完毕,已将C增加到了结果集[C, D]中

【阶段3】线程pool-1-thread-4执行完毕,已将D增加到了结果集[C, D]中

【阶段3】线程pool-1-thread-2执行完毕,已将B增加到了结果集[C, D, B]中

【阶段1】线程pool-1-thread-1执行完毕,已将65转为了A(或B或C或D)对应的ASCII65

【阶段2】线程pool-1-thread-1执行完毕,已将int65转为了字符A

【阶段3】线程pool-1-thread-1执行完毕,已将A增加到了结果集[C, D, B, A]中

完成!result=[C, D, B, A]

相关文章