如果ForkJoinPool中的线程执行阻塞I/O活动(下面使用Thread.sleep(10000)
在代码中模拟),它应该选择另一个不会导致阻塞IO的任务。但是,在代码下面运行ForkJoinPool并行度为1或2的代码,结果与之不匹配。线程不会放弃阻塞任务(即task 1和task 2),并移动到非阻塞任务(task 3)。在池大小=1的情况下,所有任务都按顺序执行,而我希望Task 3首先完成,因为在这两种情况下(池大小=1或2)它都是非阻塞的
在阻塞IO的情况下,ForkJoinPool中的线程的实际行为是什么?
ForkJoinPool pool = new ForkJoinPool(1);
List<Future<String>> tasks = new ArrayList<Future<String>>();
//Blocking
tasks.add(pool.submit(() -> { Thread.sleep(10000); System.out.println("Task 1 woke up"+ Thread.currentThread().isDaemon()); return "task1"; }));
//Blocking
tasks.add(pool.submit(() -> { Thread.sleep(10000); System.out.println("Task 2 woke up"+Thread.currentThread().isDaemon()); return "task2"; }));
//Non Blocking
tasks.add(pool.submit(() -> { System.out.println("Task 3 Runs"+Thread.currentThread().isDaemon()); return "task3"; }));
int i=0;
System.out.println("pool size = " + pool.getParallelism() + "Thread count=" +pool.getPoolSize() + "Stealing =" + pool.getStealCount());
System.out.println("waiting");
String str1 = tasks.get(0).get();
String str2 = tasks.get(1).get();
String str3 = tasks.get(2).get();
System.out.println("Results = " + str1+ str2 +str3);
System.out.println("done");
结果:
//池大小= 2
任务2唤醒真
任务1唤醒真
任务3 Runstrue
//池大小= 1
任务1唤醒真
任务2唤醒真
任务3 Runstrue
另一个观察:
当我们首先提交Task 3(非阻塞)时,只有它首先被调用。
3条答案
按热度按时间k7fdbhmy1#
如果ForkJoinPool中的Thread执行阻塞I/O活动(下面使用Thread.sleep(10000)在代码中模拟),它应该选择另一个不会导致阻塞IO的任务。
这不是线程的行为方式。线程表示动态执行。它不能任意跳转到另一个
Runnable
。线程没有办法知道线程是否阻塞I/O或执行密集计算,* 除非 * 线程引擎和特定的I/O是为此设计的(例如:jdbc vs.r2dbc)。换句话说,如果你使用Reactor/Spring Reactive/其他一些具有传统阻塞I/O的响应式编程框架,如JDBC,你的线程仍然会挂起等待I/O。
阻塞I/O通常在轮询套接字时实现为循环,特定I/O外部的任何东西都无法知道这实际上是阻塞I/O。
据我所知,
ForkJoinPool
不是React式的,它将Runnables交给线程,就是这样。qgelzfjb2#
ForkJoinPool仅用作织机提案中提到的调度程序。只有虚拟线程能够放弃阻塞任务并切换到另一个任务。根据我在loom邮件列表中阅读到的一封邮件,loom团队目前不打算重新设计java.util.concurrent的ForkJoinPool来使用虚拟线程。
dgsult0t3#
ForkJoinPool
不能任意检测应该启动其他任务以保持并行性的情况,以及不应该启动其他任务的情况。因此,正在等待某些东西的工作项必须显式地告诉
ForkJoinPool
它应该启动其他任务以保持并行性。要实现这一点,您必须使用
ForkJoinPool#managedBlock
或任何使用它的API。例如,如果您使用
CompletableFuture#join
或CompletableFuture#get
,那么这些方法在后台使用ForkJoinPool#managedBlock
,从而允许ForkJoinPool
启动其他任务以保持预期的并行性。简单的
Thread.sleep
不适用于此。如果您在示例中将
Thread.sleep(10000);
替换为CompletableFuture.runAsync(() -> { try { Thread.sleep(10000); } catch (Exception e) {}; }).join();
,那么您将获得预期的行为。