理解与 Future.sequence
for-comprehension vs Future.sequence
我的机器上有 4 个内核。
而且我认为 for-comprehension 可以并行安排内容(不依赖于之前的结果..与 flatMap 相同):
val stuffResult: Future[String] = for {
stuff1 <- service1.doServiceStuff("service 1") // worker-5 (4+1)
stuff2 <- service1.doServiceStuff("service 2")
stuff3 <- service1.doServiceStuff("service 3")
stuff4 <- service1.doServiceStuff("service 4")
} yield (stuff1 + ", " + stuff2 + ", "+ stuff3 + ", " + stuff4)
哪里
class Service {
implicit val blockingExContext = scala.concurrent.ExecutionContext.fromExecutor(null: Executor)
def doServiceStuff(name:String): Future[String] = {
Future {
blocking {
println ( s"start ${name} on " + Thread.currentThread.getName)
Thread.sleep(5000)
"stuff_done"
}
}
}
}
但我看到的是(每一步大约需要 5 秒):
- 在 ForkJoinPool-3-worker-5 上启动服务 1
- 在 ForkJoinPool-3-worker-5 上启动服务 2
- 在 ForkJoinPool-3-worker-5 上启动服务 3
- 错误:java.util.concurrent.TimeoutException:期货在 [10 秒]
后超时
所有都在一个线程上运行,而不是使用现有的免费线程并尽可能快地完成所有线程 - 在 ~5 秒内。
但如果我选择:
val stuff1 = service1.doServiceStuff("service 1")
val stuff2 = service1.doServiceStuff("service 2")
val stuff3 = service1.doServiceStuff("service 3")
val stuff4 = service1.doServiceStuff("service 4")
Future.sequence(List(stuff1, stuff2, stuff3, stuff4)).map { list =>
list.foldLeft("") { (acc, x) => acc + " " + x }
}
..
全部在5秒内结束
领悟在哪个点上依次起作用?是吗?
它不能按顺序工作,它只是不能启动 Future
直到它们被创建(这会发生在你的情况下,你的 flatMap
和你之前的 Future
),所以如果你想并行处理它们,你需要提前创建它们(使用通常的隐式 ExecutionContext
s)。
可能 this tutorial 解释得更好(它使 withFilter
变得复杂):
The purchase future is completed only once both usdQuote and chfQuote
are completed– it depends on the values of both these futures so its
own computation cannot begin earlier.
The for-comprehension above is translated into:
val purchase = usdQuote flatMap { usd => chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf)) }
which is a bit harder to grasp than the for-comprehension, but we analyze it to better
understand the flatMap operation. The flatMap operation maps its own
value into some other future. Once this different future is completed,
the resulting future is completed with its value. In our example,
flatMap uses the value of the usdQuote future to map the value of the
chfQuote into a third future which sends a request to buy a certain
amount of Swiss francs. The resulting future purchase is completed
only once this third future returned from map completes.
你真正需要的只是 map2
而不是 flatMap
,因为你不使用前一个 Future
的返回值来创建新的 Future
.
我的机器上有 4 个内核。
而且我认为 for-comprehension 可以并行安排内容(不依赖于之前的结果..与 flatMap 相同):
val stuffResult: Future[String] = for {
stuff1 <- service1.doServiceStuff("service 1") // worker-5 (4+1)
stuff2 <- service1.doServiceStuff("service 2")
stuff3 <- service1.doServiceStuff("service 3")
stuff4 <- service1.doServiceStuff("service 4")
} yield (stuff1 + ", " + stuff2 + ", "+ stuff3 + ", " + stuff4)
哪里
class Service {
implicit val blockingExContext = scala.concurrent.ExecutionContext.fromExecutor(null: Executor)
def doServiceStuff(name:String): Future[String] = {
Future {
blocking {
println ( s"start ${name} on " + Thread.currentThread.getName)
Thread.sleep(5000)
"stuff_done"
}
}
}
}
但我看到的是(每一步大约需要 5 秒):
- 在 ForkJoinPool-3-worker-5 上启动服务 1
- 在 ForkJoinPool-3-worker-5 上启动服务 2
- 在 ForkJoinPool-3-worker-5 上启动服务 3
- 错误:java.util.concurrent.TimeoutException:期货在 [10 秒] 后超时
所有都在一个线程上运行,而不是使用现有的免费线程并尽可能快地完成所有线程 - 在 ~5 秒内。
但如果我选择:
val stuff1 = service1.doServiceStuff("service 1")
val stuff2 = service1.doServiceStuff("service 2")
val stuff3 = service1.doServiceStuff("service 3")
val stuff4 = service1.doServiceStuff("service 4")
Future.sequence(List(stuff1, stuff2, stuff3, stuff4)).map { list =>
list.foldLeft("") { (acc, x) => acc + " " + x }
}
..
全部在5秒内结束
领悟在哪个点上依次起作用?是吗?
它不能按顺序工作,它只是不能启动 Future
直到它们被创建(这会发生在你的情况下,你的 flatMap
和你之前的 Future
),所以如果你想并行处理它们,你需要提前创建它们(使用通常的隐式 ExecutionContext
s)。
可能 this tutorial 解释得更好(它使 withFilter
变得复杂):
The purchase future is completed only once both usdQuote and chfQuote are completed– it depends on the values of both these futures so its own computation cannot begin earlier.
The for-comprehension above is translated into:
val purchase = usdQuote flatMap { usd => chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf)) }
which is a bit harder to grasp than the for-comprehension, but we analyze it to better understand the flatMap operation. The flatMap operation maps its own value into some other future. Once this different future is completed, the resulting future is completed with its value. In our example, flatMap uses the value of the usdQuote future to map the value of the chfQuote into a third future which sends a request to buy a certain amount of Swiss francs. The resulting future purchase is completed only once this third future returned from map completes.
你真正需要的只是 map2
而不是 flatMap
,因为你不使用前一个 Future
的返回值来创建新的 Future
.