Scala 未来的理解失败

failure in Scala future's for comprehension

我有三个连续的 Futures 并像这样在 for comprehension 中使用

val comF = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
} yield {
  // something
}

comF onSuccess { }
comF onFailure { 
  // ----------------      Here is the problem  --------------------------------
  //
  // How do I know which future failed(throw exception), when the callback comes here ?
  // Thanks for the help! Different futures using different exceptions can solve it.
}

现在我有一个像 List[Future[T]] 这样的未来列表,我首先使用这种方法 (Why does this list-of-futures to future-of-list transformation compile and work?) 将它转移到 Future[List[T]]。然后我得到了未来

val fList: Future[List[T]]
fList on Failure {
  // 
  // How do I know which is Fail now >??
}

考虑代码:

def func = {
  try {
    val x = maybeThrows
    val y = maybeThrowsToo
    val z = maybeThrowsAsWell
    result(x, y, x)
  } catch (RuntimeException e) {
    // How do I know which maybeThrows failed?
  }
}

Future 案例的工作原理基本相同。


甚至 List 中的分组计算也无济于事:

def func = {
  try {
    val x = maybeThrows
    val y = maybeThrowsToo
    val z = maybeThrowsAsWell
    val list = List(x, y, z)
    result(list)
  } catch (RuntimeException e) {
    // How do I know which maybeThrows failed?
  }
}

剧透: 你必须明确地跟踪计算失败的地方。如果用 try/catch 完成,它会产生一些样板。但幸运的是 Future(和 Try)样板并没有那么糟糕:

class TaggedException(val idx, exc: Exception)

def tagFailedWithIndex[T](idx: Int, f: Future[T]): Future[T] = 
  future recoverWith { case exc => Future.failed(new TaggedException(idx, exc)) }

val comF = for {
  f1 <- tagFailedWithIndex(0, future1)
  f2 <- tagFailedWithIndex(1, future2)
  f3 <- tagFailedWithIndex(2, future3)
} yield something(f1, f2, f3)

comF onFailure { 
  case exc: TaggedException => "%d computation failed".format(exc.idx)
}

剧透 你必须明确地跟踪哪个计算失败了。如果用 try/catch 完成,会产生很多样板文件。但幸运的是有 Try,而 Future 的行为更加相同:

class TaggedException(val idx, exc: Exception)

def tagFailedWithIndex[T](idx: Int, f: Future[T]): Future[T] = 
  future recoverWith { case exc => Future.failed(new TaggedException(idx, exc)) }

val comF = for {
  f1 <- tagFailedWithIndex(0, future1)
  f2 <- tagFailedWithIndex(1, future2)
  f3 <- tagFailedWithIndex(2, future3)
} yield something(f1, f2, f3)

comF onFailure { 
  case exc: TaggedException => "%d computation failed".format(exc.idx)
}

为了能够区分各个故障,您必须单独处理故障。可能是这样的:

Future[List[T]] = Future.sequence(
    futures.zipWithIndex.map { case (f, idx) => 
        f.onFailure { case ex => println(s"Future $idx failed with $ex") }
        f
    } 
)

问题:flatMap 将 Futures 合并为一个 Future

您正在使用 flatMap,因此未来嵌套在一个未来中。

您的密码是

import scala.concurrent.Future

val future1, future2, future3 = Future[Any]()

val comF = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
} yield {
  // something
}

comF onSuccess { ??? }
comF onFailure { ??? }

当我将脱糖规则应用于您的理解时,我得到

val comF = (future1).flatMap { case f1 => 
  (future2).flatMap { case f2 => 
    (future3).map { case f3 => {
        // something
      }
    }
  }
}

这里可以清楚的看到flatMap的用法。原则上 flatMap 做两件事:它对第一个 Future 的结果应用一个函数。这必须是一个将第一个 Future 的结果映射到另一个 Future 的函数,即 Future 嵌套在第一个 Future 中(这是 map部分)。然后 'unnestes' 两个 Futures 并将它们合并为一个 Futureflat 部分)。在这一点上,两个 Futures 已经不存在了(从概念的角度来看;技术上它们仍然存在)。相反只有一个 'merged' Future.

flatMap 的两次调用从原来的三个中创建了一个新的 Future。这就是为什么您无法找出三个原始 Futures 中的哪一个引发异常的原因:它是其中的 none。相反,新创建的 Future 运行s 并引发异常。

解决方案:单独跟踪您的进度

如果你想知道你的计算运行是在哪一步引发异常之前,你必须单独跟踪进度,例如通过 Exception 中的附加参数。或者,您可以一个接一个地删除 flatMap 和 运行 个人 Futures

关于如何跟踪进度的示例:

首先我们创建一个新的异常class,它包含真正的异常以及异常来自哪里的一些信息

class ExceptionWithOrigin[T](val origin : T, nested : Throwable) extends Exception(nested)

object ExceptionWithOrigin {
  def wrapFuture[T,U](origin : T, f : Future[U]) : Future[U] = {
    f.transform(Predef.identity, new ExceptionWithOrigin[T](origin,_))
  }

  def wrapFuture[U](f : Future[U]) = wrapFuture(f,f)
}

对于Futures我们没有特殊要求。

val future1,future2,future3 = Future[Any]()

然后我们使用我们新创建的异常 class.

的伴随对象中的辅助方法来包装给定的 Futures
import ExceptionWithOrigin._

val comF = for {
  result1 <- wrapFuture(future1)
  result2 <- wrapFuture(future2)
  result3 <- wrapFuture(future3)
} yield {
  // something
}

当您捕捉到一些异常 ex 时,您现在可以使用 ex.origin 找出它的来源。当然,起源并不完全正确。原来的Futuresfuture1future2future3并没有真正执行。而是新创建的 Future 运行s,由 flatMap 创建。但尽管如此,起源仍然有效。

有关更好命名的提示

顺便说一句,您应该将 f1f2f3 重命名为 result1result2result3。它们代表的不是一个Future,而是每个Future的计算结果(每个Future的值returns)。

尊贵的@Rit (Brendan McAdams) 在他的演讲中终于说服我尝试使用分离式 Scalaz A Skeptic's Look at scalaz' "Gateway Drugs"。您的代码将包含在析取中,如下所示:

val comF = for {
  f1 <- future1 \/> "Future 1 failed"
  f2 <- future2 \/> "Future 2 failed"
  f3 <- future3 \/> "Future 3 failed"
} yield {
  // something
}

Related quesiton