Scala FixedThreadPool 停止启动线程,即使之前的线程已经完成工作
Scala FixedThreadPool stops launching threads even though the previous ones have finished their job
有个问题折磨了我好几天
我有一个固定大小的 FixedThreadPool(为简单起见,假设为 10),我为其分配了 275 个 Runnable:
Logging.log("----- Thread Pool creation -----")
val pool = Executors.newFixedThreadPool(num_of_threads)
Logging.log("> submitting tables")
for(t <- jobTablesList) {
val db = t(0)
val tb = t(1)
val st = t(2).toInt
pool.submit(new DataMigration( db, tb, st, retry_times, file_threads))
}
pool.shutdown()
pool.awaitTermination(10000, TimeUnit.HOURS)
Logging.log("----- DATA MIGRATION END! -----")
我确定它是 275,因为我先打印作业的长度TablesList。 DataMigration class 扩展了 Runnable,即 运行 方法:
override def run(): Unit = {
Logging.log("Migration start\n\ttable: " + table + "\n\tdatabase: " + database + "\n\tstatus: " + status)
try {
if (status == 0) createTable()
else readTable()
if(!isTableView) {
if (status == 1) migrateTable()
if (status == 2) validateTable()
}
}
catch {
case e: Throwable => migrationError(e)
}
Logging.log("Migration end\n\ttable:\t" + table + "\n\tdatabase:\t" + database + "\n\tstatus:\t" + status)
}
如您所见,我确切地知道线程何时开始和完成其工作,只是因为它在日志中打印了此信息。
问题是在某些时候池停止分配任务,而队列中剩余的任务继续处理并完成,不再添加。
例如在上一个 运行 中脚本卡在了这个状态 (num_of_threads = 15):
- Table 开始: 188
- Table 结束: 188
为什么池不调度其他线程? cpu 使用率很低,内存正常
有人能帮我理解一下吗?
谢谢!
这是因为您在所有任务处理完成之前调用了pool.shutdown()
。您需要找到一种仅在所有任务处理完毕后才调用关闭的方法。
过了一段时间我弄明白了...
基本上我在两个不同的文件中定义了 DataMigration class,它们在两个不同的对象(当然有不同的名称)中相互不交互,Scala 显然不喜欢它但没有传达它
它只是在一段时间后随机停止提交线程
我删除了第二个文件,它目前继续以预期的方式工作
有个问题折磨了我好几天
我有一个固定大小的 FixedThreadPool(为简单起见,假设为 10),我为其分配了 275 个 Runnable:
Logging.log("----- Thread Pool creation -----")
val pool = Executors.newFixedThreadPool(num_of_threads)
Logging.log("> submitting tables")
for(t <- jobTablesList) {
val db = t(0)
val tb = t(1)
val st = t(2).toInt
pool.submit(new DataMigration( db, tb, st, retry_times, file_threads))
}
pool.shutdown()
pool.awaitTermination(10000, TimeUnit.HOURS)
Logging.log("----- DATA MIGRATION END! -----")
我确定它是 275,因为我先打印作业的长度TablesList。 DataMigration class 扩展了 Runnable,即 运行 方法:
override def run(): Unit = {
Logging.log("Migration start\n\ttable: " + table + "\n\tdatabase: " + database + "\n\tstatus: " + status)
try {
if (status == 0) createTable()
else readTable()
if(!isTableView) {
if (status == 1) migrateTable()
if (status == 2) validateTable()
}
}
catch {
case e: Throwable => migrationError(e)
}
Logging.log("Migration end\n\ttable:\t" + table + "\n\tdatabase:\t" + database + "\n\tstatus:\t" + status)
}
如您所见,我确切地知道线程何时开始和完成其工作,只是因为它在日志中打印了此信息。
问题是在某些时候池停止分配任务,而队列中剩余的任务继续处理并完成,不再添加。
例如在上一个 运行 中脚本卡在了这个状态 (num_of_threads = 15):
- Table 开始: 188
- Table 结束: 188
为什么池不调度其他线程? cpu 使用率很低,内存正常
有人能帮我理解一下吗?
谢谢!
这是因为您在所有任务处理完成之前调用了pool.shutdown()
。您需要找到一种仅在所有任务处理完毕后才调用关闭的方法。
过了一段时间我弄明白了...
基本上我在两个不同的文件中定义了 DataMigration class,它们在两个不同的对象(当然有不同的名称)中相互不交互,Scala 显然不喜欢它但没有传达它
它只是在一段时间后随机停止提交线程
我删除了第二个文件,它目前继续以预期的方式工作