Twitter 的 Future.collect 无法同时工作 (Scala)

Twitter's Future.collect not working concurrently (Scala)

来自 node.js 背景,我是 Scala 的新手,我尝试使用 Twitter 的 Future.collect 来执行一些简单的并发操作。但是我的代码显示顺序行为而不是并发行为。我做错了什么?

这是我的代码,

import com.twitter.util.Future

def waitForSeconds(seconds: Int, container:String): Future[String]  = Future[String] {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

def mainFunction:String = {
  val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
  val singleTask = waitForSeconds(1, "Single")

  allTasks onSuccess  { res =>
    println("All tasks succeeded with result " + res)
  }

  singleTask onSuccess { res =>
    println("Single task succeeded with result " + res)
  }

  "Function Complete"
}

println(mainFunction)

这是我得到的输出,

All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

我期望的输出是,

All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

Twitter 的 futures 比 Scala 标准库 futures 更明确地说明了在哪里执行计算。特别是,Future.apply 将安全地捕获异常(如 s.c.Future),但它没有说明计算将 运行 在哪个线程中。在你的情况下,计算是 运行在主线程中,这就是为什么你看到你看到的结果。

与标准库的未来相比,这种方法有几个优点 API。一方面,它使方法签名更简单,因为没有隐式的 ExecutionContext 必须到处传递。更重要的是,它可以更轻松地避免上下文切换 (here's a classic explanation by Brian Degenhardt). In this respect Twitter's Future is more like Scalaz's Task, and has essentially the same performance benefits (described for example in this blog post)。

更明确地说明在哪里计算 运行 的缺点是你必须更明确地说明在哪里计算 运行。在你的情况下你可以这样写:

import com.twitter.util.{ Future, FuturePool }

val pool = FuturePool.unboundedPool

def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

这不会完全产生您要求的输出("Function complete" 将首先打印,并且 allTaskssingleTask 没有相对于彼此排序),但它会 运行 在单独的线程上并行执行任务。

(作为脚注:我上面示例中的 FuturePool.unboundedPool 是为演示创建未来池的一种简单方法,通常很好,但它不适合 CPU 密集型计算——请参阅 the FuturePool API docs 以了解创建未来池的其他方法,该池将使用您提供并可以自行管理的 ExecutorService。)