Scala 等待期货序列
Scala waiting for sequence of futures
我希望像下面这样的代码会等待两个未来,但事实并非如此。
object Fiddle {
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
val seq = Future.sequence(lf)
seq.onComplete {
_ => lf.foreach(f => println(f.isCompleted))
}
}
val a = FuturesSequence
我原以为 seq.onComplete
会等它们全部完成后再完成,但事实并非如此;结果是:
true
false
.sequence
在 scala.concurrent.Future 的来源中有点难以理解,我想知道我将如何实现一个并行等待(动态大小的)序列的所有原始期货,或者什么可能是这里的问题。
编辑: 相关问题:https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)
由 Future.sequence
生成的 Future
在以下任一情况下完成:
- 所有期货都已成功完成,或者
- 其中一个期货失败了
第二点是你的情况,一旦包装 Future
之一失败就立即完成是有意义的,因为包装 Future
只能容纳一个 Throwable
在失败的情况下。等待其他期货没有意义,因为结果将是同样的失败。
等待所有结果(失败与否)的一种常见方法是 "lift" 将失败放入未来的新表示中,以便所有未来都以某种结果完成(尽管它们可能以结果完成代表失败)。一种自然的方法是提升到 Try
.
Twitter's implementation of futures 提供了一个 liftToTry
方法使这个变得微不足道,但你可以用标准库的实现做一些类似的事情:
import scala.util.{ Failure, Success, Try }
val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
_.map(Success(_)).recover { case t => Failure(t) }
)
现在 Future.sequence(lifted)
将在每个 future 完成时完成,并将使用 Try
表示成功和失败。
因此,假设执行上下文当然是隐式可用的,等待一系列 futures 的所有原始 futures 的通用解决方案可能如下所示。
import scala.util.{ Failure, Success, Try }
private def lift[T](futures: Seq[Future[T]]) =
futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
def waitAll[T](futures: Seq[Future[T]]) =
Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used
waitAll(SeqOfFutures).map {
// do whatever with the completed futures
}
这是支持上一个答案的例子。有一种简单的方法可以只使用标准的 Scala API 来做到这一点。
在示例中,我正在创建 3 个期货。这些将分别在 5、7 和 9 秒时完成。对 Await.result
的调用将阻塞,直到所有 futures 都已解决。一旦所有 3 个期货都完成,a
将设置为 List(5,7,9)
并继续执行。
此外,如果任何一个 futures 抛出异常,Await.result
将立即解除阻塞并抛出异常。取消注释 Exception(...)
行以查看实际效果。
try {
val a = Await.result(Future.sequence(Seq(
Future({
blocking {
Thread.sleep(5000)
}
System.err.println("A")
5
}),
Future({
blocking {
Thread.sleep(7000)
}
System.err.println("B")
7
//throw new Exception("Ha!")
}),
Future({
blocking {
Thread.sleep(9000)
}
System.err.println("C")
9
}))),
Duration("100 sec"))
System.err.println(a)
} catch {
case e: Exception ⇒
e.printStackTrace()
}
我们可以通过隐式 class:
用它自己的 onComplete
方法丰富 Seq[Future[T]]
def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
f map { Success(_) } recover { case e => Failure(e) }
def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
fs map { lift(_) }
implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
Future.sequence(lift(fs)) onComplete {
case Success(s) => f(s)
case Failure(e) => throw e // will never happen, because of the Try lifting
}
}
}
然后,在您特定的 MWE 中,您可以:
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2)
lf onComplete { _ map {
case Success(v) => ???
case Failure(e) => ???
}}
此解决方案的优势在于,您可以像对单个期货一样对一系列期货调用 onComplete
。
创造未来,尽量避免多余的箍。
implicit val ec = ExecutionContext.global
val f1 = Future {
Try {
throw new Throwable("kaboom")
}
}
val f2 = Future {
Try {
Thread.sleep(1000L)
2
}
}
Await.result(
Future.sequence(Seq(f1, f2)), Duration("2 sec")
) foreach {
case Success(res) => println(s"Success. $res")
case Failure(e) => println(s"Failure. ${e.getMessage}")
}
尽管这是一个很老的问题但这就是我最近得到它的方式运行。
object Fiddle {
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
val seq = Future.sequence(lf)
import scala.concurrent.duration._
Await.result(seq, Duration.Inf)
}
这不会完成,要等到所有未来都完成。您可以根据您的用例更改等待时间。我将它保持为无限,这对我来说是必需的。
我希望像下面这样的代码会等待两个未来,但事实并非如此。
object Fiddle {
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
val seq = Future.sequence(lf)
seq.onComplete {
_ => lf.foreach(f => println(f.isCompleted))
}
}
val a = FuturesSequence
我原以为 seq.onComplete
会等它们全部完成后再完成,但事实并非如此;结果是:
true
false
.sequence
在 scala.concurrent.Future 的来源中有点难以理解,我想知道我将如何实现一个并行等待(动态大小的)序列的所有原始期货,或者什么可能是这里的问题。
编辑: 相关问题:https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)
由 Future.sequence
生成的 Future
在以下任一情况下完成:
- 所有期货都已成功完成,或者
- 其中一个期货失败了
第二点是你的情况,一旦包装 Future
之一失败就立即完成是有意义的,因为包装 Future
只能容纳一个 Throwable
在失败的情况下。等待其他期货没有意义,因为结果将是同样的失败。
等待所有结果(失败与否)的一种常见方法是 "lift" 将失败放入未来的新表示中,以便所有未来都以某种结果完成(尽管它们可能以结果完成代表失败)。一种自然的方法是提升到 Try
.
Twitter's implementation of futures 提供了一个 liftToTry
方法使这个变得微不足道,但你可以用标准库的实现做一些类似的事情:
import scala.util.{ Failure, Success, Try }
val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
_.map(Success(_)).recover { case t => Failure(t) }
)
现在 Future.sequence(lifted)
将在每个 future 完成时完成,并将使用 Try
表示成功和失败。
因此,假设执行上下文当然是隐式可用的,等待一系列 futures 的所有原始 futures 的通用解决方案可能如下所示。
import scala.util.{ Failure, Success, Try }
private def lift[T](futures: Seq[Future[T]]) =
futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
def waitAll[T](futures: Seq[Future[T]]) =
Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used
waitAll(SeqOfFutures).map {
// do whatever with the completed futures
}
这是支持上一个答案的例子。有一种简单的方法可以只使用标准的 Scala API 来做到这一点。
在示例中,我正在创建 3 个期货。这些将分别在 5、7 和 9 秒时完成。对 Await.result
的调用将阻塞,直到所有 futures 都已解决。一旦所有 3 个期货都完成,a
将设置为 List(5,7,9)
并继续执行。
此外,如果任何一个 futures 抛出异常,Await.result
将立即解除阻塞并抛出异常。取消注释 Exception(...)
行以查看实际效果。
try {
val a = Await.result(Future.sequence(Seq(
Future({
blocking {
Thread.sleep(5000)
}
System.err.println("A")
5
}),
Future({
blocking {
Thread.sleep(7000)
}
System.err.println("B")
7
//throw new Exception("Ha!")
}),
Future({
blocking {
Thread.sleep(9000)
}
System.err.println("C")
9
}))),
Duration("100 sec"))
System.err.println(a)
} catch {
case e: Exception ⇒
e.printStackTrace()
}
我们可以通过隐式 class:
用它自己的onComplete
方法丰富 Seq[Future[T]]
def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
f map { Success(_) } recover { case e => Failure(e) }
def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
fs map { lift(_) }
implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
Future.sequence(lift(fs)) onComplete {
case Success(s) => f(s)
case Failure(e) => throw e // will never happen, because of the Try lifting
}
}
}
然后,在您特定的 MWE 中,您可以:
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2)
lf onComplete { _ map {
case Success(v) => ???
case Failure(e) => ???
}}
此解决方案的优势在于,您可以像对单个期货一样对一系列期货调用 onComplete
。
创造未来,尽量避免多余的箍。
implicit val ec = ExecutionContext.global
val f1 = Future {
Try {
throw new Throwable("kaboom")
}
}
val f2 = Future {
Try {
Thread.sleep(1000L)
2
}
}
Await.result(
Future.sequence(Seq(f1, f2)), Duration("2 sec")
) foreach {
case Success(res) => println(s"Success. $res")
case Failure(e) => println(s"Failure. ${e.getMessage}")
}
尽管这是一个很老的问题但这就是我最近得到它的方式运行。
object Fiddle {
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
val seq = Future.sequence(lf)
import scala.concurrent.duration._
Await.result(seq, Duration.Inf)
}
这不会完成,要等到所有未来都完成。您可以根据您的用例更改等待时间。我将它保持为无限,这对我来说是必需的。