监控任务的生命周期

x33g5p2x  于2022-04-13 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(349)

一 点睛

虽然 Thread 为我们提供了可获取状态,以及判断是否 alive 的方法,但是这样方法都是针对线程本身的,而针对我们提交的任务,它的运行状态我们是无法直接获取的,比如何时开始,何时结束,最不好的一种体验是无法获得任务执行后的结果。一般情况下,想要获得最终结果,我们不得不为 Thread 或 Runnable 传入共享变量,但在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性的安全隐患。

我们需要一种机制,可观察线程的运行状态,或者说需要监控任务的生命周期。

当某个对象发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的工作。观察者模式需要有事件源,也就是引发状态改变的源头,Thread 负责执行任务的逻辑单元,它最清楚整个过程的周期,事件发生后,只需要通知接受的一方。

二 实战

1 Observable

package concurrent.observableThread;

/**
* @className: Observable
* @description: 暴露给使用者使用
* @date: 2022/4/12
* @author: cakin
*/
public interface Observable {
    // 任务生命周期的枚举类型
    enum Cycle{
        STARTED,RUNNING,DONE,ERROR
    }
    // 获取当前任务的生命周期
    Cycle getCycle();
    // 定义启动线程的方法
    void start();
    // 定义线程的打断方法
    void interrupt();
}

2 TaskLifecycle

package concurrent.observableThread;

/**
* @className: TaskLifecycle
* @description: 在任务执行的生命周期中会被触发
* @date: 2022/4/12
* @author: 贝医
*/
public interface TaskLifecycle<T> {
    // 任务启动时会触发 onStart 方法
    void onStart(Thread thread);

    // 任务正在运行时会触发 onRunning 方法
    void onRunning(Thread thread);

    // 任务结束时会触发 onFinish 方法,其中 result 是任务执行结束后的结果
    void onFinish(Thread thread, T result);

    // 任务报错时会触发 onError 方法
    void onError(Thread thread, Exception e);

    // 生命周期接口的默认实现
    class DefaultLifecycle<T> implements TaskLifecycle<T> {
        @Override
        public void onStart(Thread thread) {
            System.out.println("task is STARTED");
        }

        @Override
        public void onRunning(Thread thread) {
            System.out.println("task is RUNNING");
        }

        @Override
        public void onFinish(Thread thread, T result) {
            System.out.println("task is DONE");
        }

        @Override
        public void onError(Thread thread, Exception e) {
            System.out.println("task is ERROR");
        }
    }

    // 生命周期接口的空实现
    class EmptyLifecycle<T> implements TaskLifecycle<T> {
        @Override
        public void onStart(Thread thread) {
        }

        @Override
        public void onRunning(Thread thread) {
        }

        @Override
        public void onFinish(Thread thread, T result) {
        }

        @Override
        public void onError(Thread thread, Exception e) {
        }
    }
}

3 Task

package concurrent.observableThread;

@FunctionalInterface
public interface Task<T> {
    // 任务执行接口,该接口允许有返回值
    T call();
}

4 ObservableThread

package concurrent.observableThread;

public class ObservableThread<T> extends Thread implements Observable {
    private final TaskLifecycle<T> lifecycle;
    private final Task<T> task;
    private Cycle cycle;

    public ObservableThread(Task<T> task) {
        this(new TaskLifecycle.DefaultLifecycle<>(), task);
    }

    public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
        super();
        // task 不允许为空
        if (task == null) {
            throw new IllegalArgumentException("The task is required");
        }
        this.lifecycle = lifecycle;
        this.task = task;
    }

    private void update(Cycle cycle, T result, Exception e) {
        this.cycle = cycle;
        if (lifecycle == null) {
            return;
        }
        try {
            switch (cycle) {
                case STARTED:
                    this.lifecycle.onStart(currentThread());
                    break;
                case RUNNING:
                    this.lifecycle.onRunning(currentThread());
                    break;
                case DONE:
                    this.lifecycle.onFinish(currentThread(), result);
                    break;
                case ERROR:
                    this.lifecycle.onError(currentThread(), e);
                    break;
            }
        } catch (Exception exception) {
            if (cycle == Cycle.ERROR) {
                throw exception;
            }
        }
    }

    @Override
    public void run() {
        // 在执行线程逻辑单元的时候,分别触发相应的事件
        this.update(Cycle.STARTED, null, null);
        try {
            this.update(Cycle.RUNNING, null, null);
            T result = this.task.call();
            this.update(Cycle.DONE, result, null);
        } catch (Exception e) {
            this.update(Cycle.ERROR, null, e);
        }
    }

    @Override
    public Cycle getCycle() {
        return this.cycle;
    }
}

三 测试

1 当任务无返回值

package concurrent.observableThread;

import java.util.concurrent.TimeUnit;

public class Test1 {
    public static void main(String[] args) {
        Observable observable = new ObservableThread<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("some task.");
            return null;
        });
        observable.start();
    }
}

测试结果为

task is STARTED

task is RUNNING

some task.

task is DONE

2 当任务有返回值

package concurrent.observableThread;

import java.util.concurrent.TimeUnit;

public class Test2 {
    public static void main(String[] args) {
        TaskLifecycle.DefaultLifecycle<String> lifecyelc = new TaskLifecycle.DefaultLifecycle<String>() {
            @Override
            public void onFinish(Thread thread, String result) {
                System.out.println("The result is" + result);
            }
        };

        Observable observable = new ObservableThread<>(lifecyelc, () -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("some task.");
            return " Hello Observer";
        });
        observable.start();
    }
}

测试结果为

task is STARTED

task is RUNNING

some task.

The result is Hello Observer

相关文章