Scala 等待期货序列

Scala waiting for sequence of futures

我希望像下面这样的代码会等待两个未来,但事实并非如此。

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

我原以为 seq.onComplete 会等它们全部完成后再完成,但事实并非如此;结果是:

true
false

.sequence 在 scala.concurrent.Future 的来源中有点难以理解,我想知道我将如何实现一个并行等待(动态大小的)序列的所有原始期货,或者什么可能是这里的问题。

编辑: 相关问题:https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)

Future.sequence 生成的 Future 在以下任一情况下完成:

  • 所有期货都已成功完成,或者
  • 其中一个期货失败了

第二点是你的情况,一旦包装 Future 之一失败就立即完成是有意义的,因为包装 Future 只能容纳一个 Throwable 在失败的情况下。等待其他期货没有意义,因为结果将是同样的失败。

等待所有结果(失败与否)的一种常见方法是 "lift" 将失败放入未来的新表示中,以便所有未来都以某种结果完成(尽管它们可能以结果完成代表失败)。一种自然的方法是提升到 Try.

Twitter's implementation of futures 提供了一个 liftToTry 方法使这个变得微不足道,但你可以用标准库的实现做一些类似的事情:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

现在 Future.sequence(lifted) 将在每个 future 完成时完成,并将使用 Try 表示成功和失败。

因此,假设执行上下文当然是隐式可用的,等待一系列 futures 的所有原始 futures 的通用解决方案可能如下所示。

  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }

这是支持上一个答案的例子。有一种简单的方法可以只使用标准的 Scala API 来做到这一点。

在示例中,我正在创建 3 个期货。这些将分别在 5、7 和 9 秒时完成。对 Await.result 的调用将阻塞,直到所有 futures 都已解决。一旦所有 3 个期货都完成,a 将设置为 List(5,7,9) 并继续执行。

此外,如果任何一个 futures 抛出异常,Await.result 将立即解除阻塞并抛出异常。取消注释 Exception(...) 行以查看实际效果。

  try {
    val a = Await.result(Future.sequence(Seq(
      Future({
        blocking {
          Thread.sleep(5000)
        }
        System.err.println("A")
        5
      }),
      Future({
        blocking {
          Thread.sleep(7000)
        }
        System.err.println("B")
        7
        //throw new Exception("Ha!")
      }),
      Future({
        blocking {
          Thread.sleep(9000)
        }
        System.err.println("C")
        9
      }))),
      Duration("100 sec"))

    System.err.println(a)
  } catch {
    case e: Exception ⇒
      e.printStackTrace()
  }

我们可以通过隐式 class:

用它自己的 onComplete 方法丰富 Seq[Future[T]]
  def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
    f map { Success(_) } recover { case e => Failure(e) }

  def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
    fs map { lift(_) }

  implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
      Future.sequence(lift(fs)) onComplete {
        case Success(s) => f(s)
        case Failure(e) => throw e // will never happen, because of the Try lifting
      }
    }
  }

然后,在您特定的 MWE 中,您可以:

  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2)

  lf onComplete { _ map {
    case Success(v) => ???
    case Failure(e) => ???
  }}

此解决方案的优势在于,您可以像对单个期货一样对一系列期货调用 onComplete

创造未来,尽量避免多余的箍。

implicit val ec = ExecutionContext.global

val f1 = Future {
  Try {
    throw new Throwable("kaboom")
  }
}

val f2 = Future {
  Try {
    Thread.sleep(1000L)
    2
  }
}

Await.result(
  Future.sequence(Seq(f1, f2)), Duration("2 sec")
) foreach {
  case Success(res) => println(s"Success. $res")
  case Failure(e)   => println(s"Failure. ${e.getMessage}")
}

尽管这是一个很老的问题但这就是我最近得到它的方式运行。

    object Fiddle {
      val f1 = Future {
        throw new Throwable("baaa") // emulating a future that bumped into an exception
      }
    
      val f2 = Future {
        Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
        2
      }
    
      val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
    
      val seq = Future.sequence(lf) 
      import scala.concurrent.duration._
      Await.result(seq, Duration.Inf) 
    }

这不会完成,要等到所有未来都完成。您可以根据您的用例更改等待时间。我将它保持为无限,这对我来说是必需的。