为了理解:如何 运行 顺序期货
For comprehension: how to run Futures sequentially
给出以下方法...
def doSomething1: Future[Int] = { ... }
def doSomething2: Future[Int] = { ... }
def doSomething3: Future[Int] = { ... }
...以及以下理解:
for {
x <- doSomething1
y <- doSomething2
z <- doSomething3
} yield x + y + z
三个方法运行并行,但在我的例子中doSomething2
必须在doSomething1
完成后运行。如何运行这三个方法顺序?
编辑
根据 Philosophus42 的建议,下面是 doSomething1
的可能实现:
def doSomething1: Future[Int] = {
// query the database for customers younger than 40;
// `find` returns a `Future` containing the number of matches
customerService.find(Json.obj("age" -> Json.obj("$lt" -> 40)))
}
... 所以 Future
是通过对另一个方法的内部调用创建的。
编辑 2
也许我过于简化了用例...我很抱歉。让我们再试一次,更接近真实的用例。以下是三种方法:
for {
// get all the transactions generated by the exchange service
transactions <- exchange.orderTransactions(orderId)
//for each transaction create a log
logs <- Future.sequence(tansactions.map { transaction =>
for {
// update trading order status
_ <- orderService.findAndUpdate(transaction.orderId, "Executed")
// create new log
log <- logService.insert(Log(
transactionId => transaction.id,
orderId => transaction.orderId,
...
))
} yield log
})
} yield logs
我想做的是为与订单关联的每笔交易创建一个日志。 logService.insert
被调用多次,即使 transactions
只包含一个条目。
我看到有两种变体可以实现这一点:
第一个:
确保 Futures 是在 for comprehension 中创建的。这意味着您的函数应该这样定义:def doSomething1: Future[Int] = Future { ... }
。在那种情况下,for comprehension 应该按顺序执行 Futures。
第二个:
使用你需要在其他人开始之前完成的 Future 的 map 功能:
doSomething1.map{ i =>
for {
y <- doSomething2
z <- doSomething3
} yield i + y + z
}
评论你的post
首先,doSomethingX
里面的代码是怎样的?更令人恼火的是,使用您给定的代码,期货 运行 并行。
回答
为了使Future
顺序执行,只需使用
for {
v1 <- Future { ..block1... }
v2 <- Future { ..block2... }
} yield combine(v1, v2)
这样做的原因是语句 Future { ..body.. } 开始异步计算,在那个时间点评估语句。
以上理解脱糖
Future { ..block1.. }
.flatMap( v1 =>
Future { ..block>.. }
.map( v2 => combine(v1,v2) )
)
很明显,
- 如果
Future{ ...block1... }
有可用的结果,
flatMap
方法被触发,
- 然后触发执行
Future { ...block2... }
。
因此Future { ...block2... }
在之后执行Future { ...block1... }
附加信息
一个Future
Future {
<block>
}
立即通过ExecutionContext
触发包含块的执行
片段 1:
val f1 = Future { <body> }
val f2 = Future { <otherbody> }
这两个计算是 运行 并行的(如果您的 ExecutionContext
是这样设置的),因为这两个值会立即计算。
片段 2:
结构
def f1 = Future { ..... }
将开始执行未来,一旦 f1
被调用
编辑:
j3d,我仍然很困惑,为什么你的代码没有按预期工作,如果你的陈述是正确的,那么 Future 是在 中创建的 computeSomethingX
方法。
这是一个代码片段,证明 computeSomething2
在 computeSomething1
之后执行
导入scala.concurrent。{等待,未来}
导入 scala.concurrent.duration._
object Playground {
import scala.concurrent.ExecutionContext.Implicits.global
def computeSomething1 : Future[Int] = {
Future {
for (i <- 1 to 10) {
println("computeSomething1")
Thread.sleep(500)
}
10
}
}
def computeSomething2 : Future[String] = {
Future {
for(i <- 1 to 10) {
println("computeSomething2")
Thread.sleep(800)
}
"hello"
}
}
def main(args: Array[String]) : Unit = {
val resultFuture: Future[String] = for {
v1 <- computeSomething1
v2 <- computeSomething2
} yield v2 + v1.toString
// evil "wait" for result
val result = Await.result(resultFuture, Duration.Inf)
println( s"Result: ${result}")
}
}
有输出
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
Result: hello10
编辑 2
如果您希望它们并行执行,请预先创建期货(此处f1
和f2
)
def main(args: Array[String]) : Unit = {
val f1 = computeSomething1
val f2 = computeSomething2
val resultFuture: Future[String] = for {
v1 <- f1
v2 <- f2
} yield v2 + v1.toString
// evil "wait" for result
val result = Await.result(resultFuture, Duration.Inf)
println( s"Result: ${result}")
}
给出以下方法...
def doSomething1: Future[Int] = { ... }
def doSomething2: Future[Int] = { ... }
def doSomething3: Future[Int] = { ... }
...以及以下理解:
for {
x <- doSomething1
y <- doSomething2
z <- doSomething3
} yield x + y + z
三个方法运行并行,但在我的例子中doSomething2
必须在doSomething1
完成后运行。如何运行这三个方法顺序?
编辑
根据 Philosophus42 的建议,下面是 doSomething1
的可能实现:
def doSomething1: Future[Int] = {
// query the database for customers younger than 40;
// `find` returns a `Future` containing the number of matches
customerService.find(Json.obj("age" -> Json.obj("$lt" -> 40)))
}
... 所以 Future
是通过对另一个方法的内部调用创建的。
编辑 2
也许我过于简化了用例...我很抱歉。让我们再试一次,更接近真实的用例。以下是三种方法:
for {
// get all the transactions generated by the exchange service
transactions <- exchange.orderTransactions(orderId)
//for each transaction create a log
logs <- Future.sequence(tansactions.map { transaction =>
for {
// update trading order status
_ <- orderService.findAndUpdate(transaction.orderId, "Executed")
// create new log
log <- logService.insert(Log(
transactionId => transaction.id,
orderId => transaction.orderId,
...
))
} yield log
})
} yield logs
我想做的是为与订单关联的每笔交易创建一个日志。 logService.insert
被调用多次,即使 transactions
只包含一个条目。
我看到有两种变体可以实现这一点:
第一个:
确保 Futures 是在 for comprehension 中创建的。这意味着您的函数应该这样定义:def doSomething1: Future[Int] = Future { ... }
。在那种情况下,for comprehension 应该按顺序执行 Futures。
第二个: 使用你需要在其他人开始之前完成的 Future 的 map 功能:
doSomething1.map{ i =>
for {
y <- doSomething2
z <- doSomething3
} yield i + y + z
}
评论你的post
首先,doSomethingX
里面的代码是怎样的?更令人恼火的是,使用您给定的代码,期货 运行 并行。
回答
为了使Future
顺序执行,只需使用
for {
v1 <- Future { ..block1... }
v2 <- Future { ..block2... }
} yield combine(v1, v2)
这样做的原因是语句 Future { ..body.. } 开始异步计算,在那个时间点评估语句。
以上理解脱糖
Future { ..block1.. }
.flatMap( v1 =>
Future { ..block>.. }
.map( v2 => combine(v1,v2) )
)
很明显,
- 如果
Future{ ...block1... }
有可用的结果, flatMap
方法被触发,- 然后触发执行
Future { ...block2... }
。
因此Future { ...block2... }
在之后执行Future { ...block1... }
附加信息
一个Future
Future {
<block>
}
立即通过ExecutionContext
片段 1:
val f1 = Future { <body> }
val f2 = Future { <otherbody> }
这两个计算是 运行 并行的(如果您的 ExecutionContext
是这样设置的),因为这两个值会立即计算。
片段 2:
结构
def f1 = Future { ..... }
将开始执行未来,一旦 f1
被调用
编辑:
j3d,我仍然很困惑,为什么你的代码没有按预期工作,如果你的陈述是正确的,那么 Future 是在 中创建的 computeSomethingX
方法。
这是一个代码片段,证明 computeSomething2
在 computeSomething1
导入scala.concurrent。{等待,未来} 导入 scala.concurrent.duration._
object Playground {
import scala.concurrent.ExecutionContext.Implicits.global
def computeSomething1 : Future[Int] = {
Future {
for (i <- 1 to 10) {
println("computeSomething1")
Thread.sleep(500)
}
10
}
}
def computeSomething2 : Future[String] = {
Future {
for(i <- 1 to 10) {
println("computeSomething2")
Thread.sleep(800)
}
"hello"
}
}
def main(args: Array[String]) : Unit = {
val resultFuture: Future[String] = for {
v1 <- computeSomething1
v2 <- computeSomething2
} yield v2 + v1.toString
// evil "wait" for result
val result = Await.result(resultFuture, Duration.Inf)
println( s"Result: ${result}")
}
}
有输出
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
Result: hello10
编辑 2
如果您希望它们并行执行,请预先创建期货(此处f1
和f2
)
def main(args: Array[String]) : Unit = {
val f1 = computeSomething1
val f2 = computeSomething2
val resultFuture: Future[String] = for {
v1 <- f1
v2 <- f2
} yield v2 + v1.toString
// evil "wait" for result
val result = Await.result(resultFuture, Duration.Inf)
println( s"Result: ${result}")
}