我有一个困扰我好几天的问题
我有一个固定大小的fixedthreadpool(为简单起见,假设为10),我将275runnable分配给它:
Logging.log("----- Thread Pool creation -----")
val pool = Executors.newFixedThreadPool(num_of_threads)
Logging.log("> submitting tables")
for(t <- jobTablesList) {
val db = t(0)
val tb = t(1)
val st = t(2).toInt
pool.submit(new DataMigration( db, tb, st, retry_times, file_threads))
}
pool.shutdown()
pool.awaitTermination(10000, TimeUnit.HOURS)
Logging.log("----- DATA MIGRATION END! -----")
我肯定是275,因为我先打印工作表列表的长度。datamigration类扩展了runnable,即run方法:
override def run(): Unit = {
Logging.log("Migration start\n\ttable: " + table + "\n\tdatabase: " + database + "\n\tstatus: " + status)
try {
if (status == 0) createTable()
else readTable()
if(!isTableView) {
if (status == 1) migrateTable()
if (status == 2) validateTable()
}
}
catch {
case e: Throwable => migrationError(e)
}
Logging.log("Migration end\n\ttable:\t" + table + "\n\tdatabase:\t" + database + "\n\tstatus:\t" + status)
}
如您所见,我确切地知道线程何时开始和完成其工作,这仅仅是因为它在日志中打印这些信息。
问题是,在某个时刻,池停止分配任务,而队列中剩余的任务继续被处理和完成,不再添加。
例如,在上一次运行中,脚本陷入这种状态(num \u of \u threads=15):
表开始:188
结束表格:188
为什么池不调度其他线程?cpu使用率很低,ram正常
有人能帮我理解吗?
谢谢!
1条答案
按热度按时间slmsl1lt1#
是因为你打电话来
pool.shutdown()
在处理所有任务之前。只有在处理完所有任务后,才需要找到调用shutdown的方法。