Twitter 的 Future.collect 无法同时工作 (Scala)
Twitter's Future.collect not working concurrently (Scala)
来自 node.js 背景,我是 Scala 的新手,我尝试使用 Twitter 的 Future.collect 来执行一些简单的并发操作。但是我的代码显示顺序行为而不是并发行为。我做错了什么?
这是我的代码,
import com.twitter.util.Future
def waitForSeconds(seconds: Int, container:String): Future[String] = Future[String] {
Thread.sleep(seconds*1000)
println(container + ": done waiting for " + seconds + " seconds")
container + " :done waiting for " + seconds + " seconds"
}
def mainFunction:String = {
val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
val singleTask = waitForSeconds(1, "Single")
allTasks onSuccess { res =>
println("All tasks succeeded with result " + res)
}
singleTask onSuccess { res =>
println("Single task succeeded with result " + res)
}
"Function Complete"
}
println(mainFunction)
这是我得到的输出,
All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
我期望的输出是,
All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
Twitter 的 futures 比 Scala 标准库 futures 更明确地说明了在哪里执行计算。特别是,Future.apply
将安全地捕获异常(如 s.c.Future
),但它没有说明计算将 运行 在哪个线程中。在你的情况下,计算是 运行在主线程中,这就是为什么你看到你看到的结果。
与标准库的未来相比,这种方法有几个优点 API。一方面,它使方法签名更简单,因为没有隐式的 ExecutionContext
必须到处传递。更重要的是,它可以更轻松地避免上下文切换 (here's a classic explanation by Brian Degenhardt). In this respect Twitter's Future
is more like Scalaz's Task
, and has essentially the same performance benefits (described for example in this blog post)。
更明确地说明在哪里计算 运行 的缺点是你必须更明确地说明在哪里计算 运行。在你的情况下你可以这样写:
import com.twitter.util.{ Future, FuturePool }
val pool = FuturePool.unboundedPool
def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
Thread.sleep(seconds*1000)
println(container + ": done waiting for " + seconds + " seconds")
container + " :done waiting for " + seconds + " seconds"
}
这不会完全产生您要求的输出("Function complete" 将首先打印,并且 allTasks
和 singleTask
没有相对于彼此排序),但它会 运行 在单独的线程上并行执行任务。
(作为脚注:我上面示例中的 FuturePool.unboundedPool
是为演示创建未来池的一种简单方法,通常很好,但它不适合 CPU 密集型计算——请参阅 the FuturePool
API docs 以了解创建未来池的其他方法,该池将使用您提供并可以自行管理的 ExecutorService
。)
来自 node.js 背景,我是 Scala 的新手,我尝试使用 Twitter 的 Future.collect 来执行一些简单的并发操作。但是我的代码显示顺序行为而不是并发行为。我做错了什么?
这是我的代码,
import com.twitter.util.Future
def waitForSeconds(seconds: Int, container:String): Future[String] = Future[String] {
Thread.sleep(seconds*1000)
println(container + ": done waiting for " + seconds + " seconds")
container + " :done waiting for " + seconds + " seconds"
}
def mainFunction:String = {
val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
val singleTask = waitForSeconds(1, "Single")
allTasks onSuccess { res =>
println("All tasks succeeded with result " + res)
}
singleTask onSuccess { res =>
println("Single task succeeded with result " + res)
}
"Function Complete"
}
println(mainFunction)
这是我得到的输出,
All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
我期望的输出是,
All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
Twitter 的 futures 比 Scala 标准库 futures 更明确地说明了在哪里执行计算。特别是,Future.apply
将安全地捕获异常(如 s.c.Future
),但它没有说明计算将 运行 在哪个线程中。在你的情况下,计算是 运行在主线程中,这就是为什么你看到你看到的结果。
与标准库的未来相比,这种方法有几个优点 API。一方面,它使方法签名更简单,因为没有隐式的 ExecutionContext
必须到处传递。更重要的是,它可以更轻松地避免上下文切换 (here's a classic explanation by Brian Degenhardt). In this respect Twitter's Future
is more like Scalaz's Task
, and has essentially the same performance benefits (described for example in this blog post)。
更明确地说明在哪里计算 运行 的缺点是你必须更明确地说明在哪里计算 运行。在你的情况下你可以这样写:
import com.twitter.util.{ Future, FuturePool }
val pool = FuturePool.unboundedPool
def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
Thread.sleep(seconds*1000)
println(container + ": done waiting for " + seconds + " seconds")
container + " :done waiting for " + seconds + " seconds"
}
这不会完全产生您要求的输出("Function complete" 将首先打印,并且 allTasks
和 singleTask
没有相对于彼此排序),但它会 运行 在单独的线程上并行执行任务。
(作为脚注:我上面示例中的 FuturePool.unboundedPool
是为演示创建未来池的一种简单方法,通常很好,但它不适合 CPU 密集型计算——请参阅 the FuturePool
API docs 以了解创建未来池的其他方法,该池将使用您提供并可以自行管理的 ExecutorService
。)