这怎么能在scala中同时完成

How can this be done concurrently in scala

所以我有这段代码

dbs.foreach({
  var map = scala.collection.mutable.Map[String, mutable.MutableList[String]]()
  db =>
    val resultList = getTables(hive, db)
    map+=(db -> resultList)
})

它的作用是循环遍历数据库列表,对每个数据库执行 show tables in db 调用,然后将数据库 -> table 添加到映射中。这怎么能同时完成,因为配置单元查询有大约 5 秒的等待时间 return?

更新代码--

def getAllTablesConcurrent(hive: JdbcHive, dbs: mutable.MutableList[String]): Map[String, mutable.MutableList[String]] = {
  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
  val futures = dbs.map {
    db =>
        Future(db, getTables(hive, db))
    }
  val map = Await.result( Future.sequence(futures), Duration(10, TimeUnit.SECONDS) ).toMap
  map
}

您可以在任何 Scala 集合上使用 .par 来并行执行下一个转换(使用取决于内核数量的默认并行度)。

此外 - map 到(不可变)地图而不是更新可变地图更容易和更清晰。

val result = dbs.par.map(db => db -> getTables(hive, db)).toMap

要更好地控制使用的并发线程数,请参阅https://docs.scala-lang.org/overviews/parallel-collections/configuration.html

如果你想要更多的控制(你想等待多少时间,你想使用多少线程,如果你所有的线程都忙会发生什么,等等)你可以使用 ThreadPollExecutor 和 Future

  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val dbs = List("db1", "db2", "db3")

  val futures = dbs.map {
   name => Future(name, getables(hive, name))
  }

  val result = Await.result( Future.sequence(futures), Duration(TIMEOUT, TimeUnit.MILLISECONDS) ).toMap

记住不要在每次需要时都创建一个新的 ExecutionContext

不要使用变量和可变状态,尤其是如果你想要并发。

 val result: Future[Map[String, Seq[String]] = Future
   .traverse(dbs) { name => 
       Future(name -> getTables(hive, name) )
   }.map(_.toMap)