Scala 期货和 `andThen` 异常传播

Scala futures and `andThen` exception propagation

我正在阅读 scala.concurrent.Future 模块中 andThen 函数的 Scala 2.11.8 文档,其中说明如下:

def andThen[U](pf: PartialFunction[Try[T], U])
              (implicit executor: ExecutionContext): Future[T]

Applies the side-effecting function to the result of this future, and returns a new future with the result of this future.

This method allows one to enforce that the callbacks are executed in a specified order.

Note that if one of the chained andThen callbacks throws an exception, that exception is not propagated to the subsequent andThen callbacks. Instead, the subsequent andThen callbacks are given the original value of this future.

我不确定 andThen 不传播异常的确切含义是什么,也没有提供示例。例如,如果我这样做:

Future.successful { throw new RuntimeException("test") } andThen
                  { case _ => println("test") }

在 Scala REPL 中我得到:

java.lang.RuntimeException: test
  ... 32 elided

因此传播了异常。有人可以提供一个有意义的例子,这到底意味着什么,以及使用 andThen 和我想从中恢复的抛出异常的代码是否安全。谢谢。

我认为类型签名是最好的文档。 如您所见,andThen 接受 T => U(为简单起见省略了 PF 和 Try),并返回 Future[T]。 所以你可以想到,andThen 执行一些效果并返回原来的未来。因此,如果您的部分函数引发异常,它将不会传播到其他 andThens,它们将作用于原始 future:

  import scala.concurrent.ExecutionContext.Implicits.global
  Future{ 2 } andThen {
    case _ => throw new RuntimeException("test")
  } andThen {
    case v ⇒ println("After exception"); println(v)
  }
Thread.sleep(500)

这会打印:

java.lang.RuntimeException: test    
After exception
Success(2)

PS。再次阅读您的示例。我想你只是最后忘记了 Thread.sleep 来让未来在程序结束前完成。所以你可能理解正确。

不要 throw Future.successful {} 中的异常。

这是正确的做法

Future { throw new RuntimeException("test") } andThen
                  { case _ => println("test") }

使用下面这行代码

就可以理解andThen的使用
Future.successful { 1 } andThen { case _ =>  "foo" }

REPL

@ Future.successful { 1 } andThen { case _ =>  "foo" }
res7: Future[Int] = Success(1)

REPL

@ Future.successful { 1 } andThen { case _ =>  println("foo") }
foo
res8: Future[Int] = Success(1)

REPL

@ val result = Future.successful { 1 } andThen { case _ =>  "foo" }
result: Future[Int] = Success(1)

在上面的例子中

我们可以看到执行了andthen之后的部分函数,​​但是return类型的部分函数被忽略了。最后得到的输出是 Future 的结果,即 Future[Int]

这意味着addThen用于在Future完成后立即执行副作用函数。

当未来失败时

REPL

@ val result = Future { throw new Exception("bar") } andThen { case _ =>  "foo" }
result: Future[Nothing] = Failure(java.lang.Exception: bar)

REPL

@ val result = Future { throw new Exception("bar") } andThen { case _ =>  println("foo") }
foo
result: Future[Nothing] = Failure(java.lang.Exception: bar)

当 future 失败时也是如此。 andThen 之后的代码执行,但是 andThen 之后的代码的结果被忽略,最终结果是 Future 结果。

所以 andThen 用于 运行 Future 完成后的副作用代码。 andThen 也将最终输出保持为 Futures 输出。

这就是 andThen 在标准库中的实现方式。

andThen 位于 Future class

 def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = {
    val p = Promise[T]()
    onComplete {
      case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r
    }
    p.future
  }

1) Applies the side-effecting function to the result of this future, and returns a new future with the result of this future.

pf 是副作用代码,因为它的输出类型没有被使用(不能被使用)。 p.future就是他说的新的未来。 Promise用前面的Future结果完成(看上面addThen的实现)

在 finally 块中 p complete r 表示使用 p.future 创建新的 Future 并使用先前的 future 结果完成,即 r

2) This method allows one to enforce that the callbacks are executed in a specified order.

是的。您可以使用多个 andThen 调用链接多个回调,这些回调将按照 andThen 调用的顺序一个接一个地执行。与onComplete方法相比,可以多次使用注册多个回调,但这些回调的顺序是不确定的。

3) Note that if one of the chained andThen callbacks throws an exception, that exception is not propagated to the subsequent andThen callbacks. Instead, the subsequent andThen callbacks are given the original value of this future.

r 是前一个future的结果给了pf(看上面的andThen代码)