Future 设计模式支持回调

x33g5p2x  于2022-04-20 转载在 其他  
字(2.9k)|赞(0)|评价(0)|浏览(257)

一 说明

在上一篇的基础上,增加回调功能。

二 核心代码

1 增加回调版本接口

package concurrent.future;

public interface FutureService<IN, OUT> {
    // 提交不需要返回值的任务,Future.get 方法返回的将会是 null
    Future<?> submit(Runnable runnable);

    // 提交需要返回值的任务,其中 Task 接口代替了 Runnable 接口
    Future<OUT> submit(Task<IN, OUT> task, IN input);

    // 增加回调机制,提高使用体验
    Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback);

    // 使用静态方法创建一个  FutureService 实例
    static <T, R> FutureService<T, R> newService() {
        return new FutureServiceImpl<>();
    }
}

2 实现回调版本

package concurrent.future;

import java.util.concurrent.atomic.AtomicInteger;

public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> {
    // 线程名字前缀
    private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
    private final AtomicInteger nextCounter = new AtomicInteger(0);

    private String getNextName() {
        return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
    }

    @Override
    public Future<?> submit(Runnable runnable) {
        final FutureTask<Void> future = new FutureTask<>();
        new Thread(() -> {
            runnable.run();
            // 任务结束之后将 null 作为结果传给 future
            future.finish(null);
        }, getNextName()).start();
        return future;
    }

    @Override
    public Future<OUT> submit(Task<IN, OUT> task, IN input) {
        final FutureTask<OUT> future = new FutureTask<>();
        new Thread(() -> {
            OUT result = task.get(input);
            // 任务结束之后,将真实的结果通过 finish 方法传递给 future
            future.finish(result);
        }, getNextName()).start();
        return future;
    }

    // 使用静态方法创建一个  FutureService 实例
    @Override
    public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) {
        final FutureTask<OUT> future = new FutureTask<>();
        new Thread(() -> {
            OUT result = task.get(input);
            // 任务结束之后,将真实的结果通过 finish 方法传递给 future
            future.finish(result);
            // 执行回调接口
            if (null != callback) {
                callback.call(result);
            }
        }, getNextName()).start();
        return future;
    }
}

3 增加回调接口

package concurrent.future;

@FunctionalInterface
public interface Callback<T> {
    void call(T t);
}

4 测试代码

package concurrent.future;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        // 定义不需要返回值的 FutureService
        FutureService<Void, Void> service = FutureService.newService();
        Future<?> futrue = service.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("I am finish done");
        });
        // futrue 方法会使当前线程进入阻塞
        futrue.get();

        // 定义有需要返回值的 FutureService
        FutureService<String, Integer> service1 = FutureService.newService();
        Future<Integer> future1 = service1.submit(input ->
        {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return input.length();
        }, "hello");
        System.out.println(future1.get());

        // 回调版本
        FutureService<String, Integer> service2 = FutureService.newService();
        service2.submit(input ->
        {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return input.length();
        }, "hello", System.out::println);
    }
}

三 测试结果

I am finish done

5

5

创作挑战赛

新人创作奖励来咯,坚持创作打卡瓜分现金大奖

相关文章