在上一篇的基础上,增加回调功能。
package concurrent.future;
public interface FutureService<IN, OUT> {
// 提交不需要返回值的任务,Future.get 方法返回的将会是 null
Future<?> submit(Runnable runnable);
// 提交需要返回值的任务,其中 Task 接口代替了 Runnable 接口
Future<OUT> submit(Task<IN, OUT> task, IN input);
// 增加回调机制,提高使用体验
Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback);
// 使用静态方法创建一个 FutureService 实例
static <T, R> FutureService<T, R> newService() {
return new FutureServiceImpl<>();
}
}
package concurrent.future;
import java.util.concurrent.atomic.AtomicInteger;
public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> {
// 线程名字前缀
private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
private final AtomicInteger nextCounter = new AtomicInteger(0);
private String getNextName() {
return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
}
@Override
public Future<?> submit(Runnable runnable) {
final FutureTask<Void> future = new FutureTask<>();
new Thread(() -> {
runnable.run();
// 任务结束之后将 null 作为结果传给 future
future.finish(null);
}, getNextName()).start();
return future;
}
@Override
public Future<OUT> submit(Task<IN, OUT> task, IN input) {
final FutureTask<OUT> future = new FutureTask<>();
new Thread(() -> {
OUT result = task.get(input);
// 任务结束之后,将真实的结果通过 finish 方法传递给 future
future.finish(result);
}, getNextName()).start();
return future;
}
// 使用静态方法创建一个 FutureService 实例
@Override
public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) {
final FutureTask<OUT> future = new FutureTask<>();
new Thread(() -> {
OUT result = task.get(input);
// 任务结束之后,将真实的结果通过 finish 方法传递给 future
future.finish(result);
// 执行回调接口
if (null != callback) {
callback.call(result);
}
}, getNextName()).start();
return future;
}
}
package concurrent.future;
@FunctionalInterface
public interface Callback<T> {
void call(T t);
}
package concurrent.future;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
// 定义不需要返回值的 FutureService
FutureService<Void, Void> service = FutureService.newService();
Future<?> futrue = service.submit(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("I am finish done");
});
// futrue 方法会使当前线程进入阻塞
futrue.get();
// 定义有需要返回值的 FutureService
FutureService<String, Integer> service1 = FutureService.newService();
Future<Integer> future1 = service1.submit(input ->
{
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return input.length();
}, "hello");
System.out.println(future1.get());
// 回调版本
FutureService<String, Integer> service2 = FutureService.newService();
service2.submit(input ->
{
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return input.length();
}, "hello", System.out::println);
}
}
I am finish done
5
5
创作挑战赛
新人创作奖励来咯,坚持创作打卡瓜分现金大奖
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/chengqiuming/article/details/124282361
内容来源于网络,如有侵权,请联系作者删除!