Scala FixedThreadPool 停止启动线程,即使之前的线程已经完成工作

Scala FixedThreadPool stops launching threads even though the previous ones have finished their job

有个问题折磨了我好几天

我有一个固定大小的 FixedThreadPool(为简单起见,假设为 10),我为其分配了 275 个 Runnable:

Logging.log("----- Thread Pool creation -----")
val pool = Executors.newFixedThreadPool(num_of_threads)
Logging.log("> submitting tables")
for(t <- jobTablesList) {
  val db = t(0)
  val tb = t(1)
  val st = t(2).toInt
  pool.submit(new DataMigration( db, tb, st, retry_times, file_threads))
}

pool.shutdown()
pool.awaitTermination(10000, TimeUnit.HOURS)
Logging.log("----- DATA MIGRATION END! -----")

我确定它是 275,因为我先打印作业的长度TablesList。 DataMigration class 扩展了 Runnable,即 运行 方法:

override def run(): Unit = {
  Logging.log("Migration start\n\ttable: " + table + "\n\tdatabase: " + database + "\n\tstatus: " + status)

  try {
    if (status == 0) createTable()
    else readTable()

    if(!isTableView) {
      if (status == 1) migrateTable()
      if (status == 2) validateTable()
    }
  }
  catch {
    case e: Throwable => migrationError(e)
  }

  Logging.log("Migration end\n\ttable:\t" + table + "\n\tdatabase:\t" + database + "\n\tstatus:\t" + status)

}

如您所见,我确切地知道线程何时开始和完成其工作,只是因为它在日志中打印了此信息。

问题是在某些时候池停止分配任务,而队列中剩余的任务继续处理并完成,不再添加。

例如在上一个 运行 中脚本卡在了这个状态 (num_of_threads = 15):

为什么池不调度其他线程? cpu 使用率很低,内存正常

有人能帮我理解一下吗?

谢谢!

这是因为您在所有任务处理完成之前调用了pool.shutdown()。您需要找到一种仅在所有任务处理完毕后才调用关闭的方法。

过了一段时间我弄明白了...

基本上我在两个不同的文件中定义了 DataMigration class,它们在两个不同的对象(当然有不同的名称)中相互不交互,Scala 显然不喜欢它但没有传达它

它只是在一段时间后随机停止提交线程

我删除了第二个文件,它目前继续以预期的方式工作