虽然 Thread 为我们提供了可获取状态,以及判断是否 alive 的方法,但是这样方法都是针对线程本身的,而针对我们提交的任务,它的运行状态我们是无法直接获取的,比如何时开始,何时结束,最不好的一种体验是无法获得任务执行后的结果。一般情况下,想要获得最终结果,我们不得不为 Thread 或 Runnable 传入共享变量,但在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性的安全隐患。
我们需要一种机制,可观察线程的运行状态,或者说需要监控任务的生命周期。
当某个对象发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的工作。观察者模式需要有事件源,也就是引发状态改变的源头,Thread 负责执行任务的逻辑单元,它最清楚整个过程的周期,事件发生后,只需要通知接受的一方。
package concurrent.observableThread;
/**
* @className: Observable
* @description: 暴露给使用者使用
* @date: 2022/4/12
* @author: cakin
*/
public interface Observable {
// 任务生命周期的枚举类型
enum Cycle{
STARTED,RUNNING,DONE,ERROR
}
// 获取当前任务的生命周期
Cycle getCycle();
// 定义启动线程的方法
void start();
// 定义线程的打断方法
void interrupt();
}
package concurrent.observableThread;
/**
* @className: TaskLifecycle
* @description: 在任务执行的生命周期中会被触发
* @date: 2022/4/12
* @author: 贝医
*/
public interface TaskLifecycle<T> {
// 任务启动时会触发 onStart 方法
void onStart(Thread thread);
// 任务正在运行时会触发 onRunning 方法
void onRunning(Thread thread);
// 任务结束时会触发 onFinish 方法,其中 result 是任务执行结束后的结果
void onFinish(Thread thread, T result);
// 任务报错时会触发 onError 方法
void onError(Thread thread, Exception e);
// 生命周期接口的默认实现
class DefaultLifecycle<T> implements TaskLifecycle<T> {
@Override
public void onStart(Thread thread) {
System.out.println("task is STARTED");
}
@Override
public void onRunning(Thread thread) {
System.out.println("task is RUNNING");
}
@Override
public void onFinish(Thread thread, T result) {
System.out.println("task is DONE");
}
@Override
public void onError(Thread thread, Exception e) {
System.out.println("task is ERROR");
}
}
// 生命周期接口的空实现
class EmptyLifecycle<T> implements TaskLifecycle<T> {
@Override
public void onStart(Thread thread) {
}
@Override
public void onRunning(Thread thread) {
}
@Override
public void onFinish(Thread thread, T result) {
}
@Override
public void onError(Thread thread, Exception e) {
}
}
}
package concurrent.observableThread;
@FunctionalInterface
public interface Task<T> {
// 任务执行接口,该接口允许有返回值
T call();
}
package concurrent.observableThread;
public class ObservableThread<T> extends Thread implements Observable {
private final TaskLifecycle<T> lifecycle;
private final Task<T> task;
private Cycle cycle;
public ObservableThread(Task<T> task) {
this(new TaskLifecycle.DefaultLifecycle<>(), task);
}
public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
super();
// task 不允许为空
if (task == null) {
throw new IllegalArgumentException("The task is required");
}
this.lifecycle = lifecycle;
this.task = task;
}
private void update(Cycle cycle, T result, Exception e) {
this.cycle = cycle;
if (lifecycle == null) {
return;
}
try {
switch (cycle) {
case STARTED:
this.lifecycle.onStart(currentThread());
break;
case RUNNING:
this.lifecycle.onRunning(currentThread());
break;
case DONE:
this.lifecycle.onFinish(currentThread(), result);
break;
case ERROR:
this.lifecycle.onError(currentThread(), e);
break;
}
} catch (Exception exception) {
if (cycle == Cycle.ERROR) {
throw exception;
}
}
}
@Override
public void run() {
// 在执行线程逻辑单元的时候,分别触发相应的事件
this.update(Cycle.STARTED, null, null);
try {
this.update(Cycle.RUNNING, null, null);
T result = this.task.call();
this.update(Cycle.DONE, result, null);
} catch (Exception e) {
this.update(Cycle.ERROR, null, e);
}
}
@Override
public Cycle getCycle() {
return this.cycle;
}
}
package concurrent.observableThread;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) {
Observable observable = new ObservableThread<>(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("some task.");
return null;
});
observable.start();
}
}
测试结果为
task is STARTED
task is RUNNING
some task.
task is DONE
package concurrent.observableThread;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
TaskLifecycle.DefaultLifecycle<String> lifecyelc = new TaskLifecycle.DefaultLifecycle<String>() {
@Override
public void onFinish(Thread thread, String result) {
System.out.println("The result is" + result);
}
};
Observable observable = new ObservableThread<>(lifecyelc, () -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("some task.");
return " Hello Observer";
});
observable.start();
}
}
测试结果为
task is STARTED
task is RUNNING
some task.
The result is Hello Observer
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/chengqiuming/article/details/124133323
内容来源于网络,如有侵权,请联系作者删除!