Scala Future 和 TimeoutException:如何知道根本原因?

Scala Future and TimeoutException: how to know the root cause?

假设我有以下代码:

val futureInt1 = getIntAsync1();
val futureInt2 = getIntAsync2();

val futureSum = for {
  int1 <- futureInt1
  int2 <- futureInt2
} yield (int1 + int2) 

val sum = Await.result(futureSum, 60 seconds)

现在假设 getIntAsync1getIntAsync2 之一需要很长时间,并导致 Await.result 抛出异常:

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60 seconds]

我怎么知道 getIntAsync1getIntAsync2 中的哪一个仍在等待处理并实际导致超时?

请注意,这里我将 2 个 futures 与 zip 合并,这是问题的一个简单示例,但在我的应用程序中,我有不同级别的此类代码(即 getIntAsync1 本身可以使用Future.zipFuture.sequence, map/flatMap/applicative)

不知何故,当我的主线程发生超时时,我希望能够记录挂起的并发操作堆栈跟踪,这样我就可以知道系统中的瓶颈在哪里。


我有一个现有的遗留 API 后端,它还没有完全反应并且不会很快。我试图通过使用并发来增加响应时间。但是自从使用了这种代码之后,理解为什么我的应用程序中的某件事需要花费大量时间变得更加痛苦。如果您能提供帮助我调试此类问题的任何提示,我将不胜感激。

您可以通过调用其 isComplete 方法检查未来是否已完成

if (futureInt1.isComplete) { /*futureInt2 must be the culprit */ } 
if (futureInt2.isComplete) { /*futureInt1 must be the culprit */ } 

作为第一种方法,我建议将您的 Future[Int] 提升为 Future[Try[Int]]。类似的东西:

object impl {

  def checkException[T](in: Future[T]): Future[Try[T]] = 
    in.map(Success(_)).recover {
      case e: Throwable => {   
        Failure(new Exception("Error in future: " + in))
      }
    }

  implicit class FutureCheck(s: Future[Int]) {
   def check = checkException(s)
  }
}

然后是一个合并结果的小函数,类似这样:

object test {

  import impl._

  val futureInt1 = Future{ 1 }
  val futureInt2 = Future{ 2 }

  def combine(a: Try[Int], b: Try[Int])(f: (Int, Int) => (Int)) : Try[Int] = {
    if(a.isSuccess && b.isSuccess) {
      Success(f(a.get, b.get))
    }
   else 
     Failure(new Exception("Error adding results"))
  }

  val futureSum = for {
    int1 <- futureInt1.check
    int2 <- futureInt2.check
  } yield combine(int1, int2)(_ + _)
}

在 futureSum 中,您将有一个带有整数的 Try[Int] 或一个带有与可能错误相对应的异常的 Failure。

也许这会有用

val futureInt1 = getIntAsync1();
val futureInt2 = getIntAsync2();

val futureSum = for {
  int1 <- futureInt1
  int2 <- futureInt2
} yield (int1 + int2) 

Try(Await.result(futureSum, 60 seconds)) match {
   case Success(sum) => println(sum)
   case Failure(e)   => println("we got timeout. the unfinished futures are: " + List(futureInt1, futureInt2).filter(!_.isCompleted)
}

建议的解决方案将每个未来从 for 块包装到需要超时和名称的 TimelyFuture 中。它在内部使用 Await 来检测个别超时。 请记住,这种使用期货的方式不适用于生产代码,因为它使用了阻塞。它仅用于诊断,以找出哪些期货需要时间才能完成。

package xxx

import java.util.concurrent.TimeoutException

import scala.concurrent.{Future, _}
import scala.concurrent.duration.Duration
import scala.util._
import scala.concurrent.duration._

class TimelyFuture[T](f: Future[T], name: String, duration: Duration) extends Future[T] {

  override def onComplete[U](ff: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = f.onComplete(x => ff(x))

  override def isCompleted: Boolean = f.isCompleted

  override def value: Option[Try[T]] = f.value

  @scala.throws[InterruptedException](classOf[InterruptedException])
  @scala.throws[TimeoutException](classOf[TimeoutException])
  override def ready(atMost: Duration)(implicit permit: CanAwait): TimelyFuture.this.type = {
    Try(f.ready(atMost)(permit)) match {
      case Success(v) => this
      case Failure(e) => this
    }
  }

  @scala.throws[Exception](classOf[Exception])
  override def result(atMost: Duration)(implicit permit: CanAwait): T = {
    f.result(atMost)
  }

  override def transform[S](ff: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] = {
    val p = Promise[S]()
    Try(Await.result(f, duration)) match {
      case s@Success(_) => ff(s) match {
        case Success(v) => p.success(v)
        case Failure(e) => p.failure(e)
      }
      case Failure(e) => e match {
        case e: TimeoutException => p.failure(new RuntimeException(s"future ${name} has timed out after ${duration}"))
        case _ => p.failure(e)
      }
    }
    p.future
  }

  override def transformWith[S](ff: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] = {
    val p = Promise[S]()
    Try(Await.result(f, duration)) match {
      case s@Success(_) => ff(s).onComplete({
        case Success(v) => p.success(v)
        case Failure(e) => p.failure(e)
      })
      case Failure(e) => e match {
        case e: TimeoutException => p.failure(new RuntimeException(s"future ${name} has timed out after ${duration}"))
        case _ => p.failure(e)
      }
    }
    p.future
  }
}

object Main {

  import scala.concurrent.ExecutionContext.Implicits.global

  def main(args: Array[String]): Unit = {
    val f = Future {
      Thread.sleep(5);
      1
    }

    val g = Future {
      Thread.sleep(2000);
      2
    }

    val result: Future[(Int, Int)] = for {
      v1 <- new TimelyFuture(f, "f", 10 milliseconds)
      v2 <- new TimelyFuture(g, "g", 10 milliseconds)
    } yield (v1, v2)


    val sum = Await.result(result, 1 seconds) // as expected, this throws exception : "RuntimeException: future g has timed out after 10 milliseconds"
  }
}

实现的关键是 Future 在您的示例中不会超时 — 是您的调用线程最多暂停 X 次。

所以,如果你想在你的 Futures 中建模 time 你应该在每个分支上使用 zipWith 并压缩一个包含一定时间内值的 Future 。如果你使用 Akka 那么你可以使用 akka.pattern.after 和 Future.firstCompletedOf.

现在,即使你这样做了,你如何弄清楚为什么你的任何期货没有及时完成,也许它们依赖于其他未完成的期货.

问题归结为:您是否尝试对吞吐量进行根本原因分析?那么你应该监控你的 ExecutionContext,而不是你的 Futures。期货只是价值。

如果您只是在寻找单个 future 花费了很长时间(或与其他 future 一起)的信息指标,那么最好的办法是在创建 futures 时使用包装器来记录指标:

    object InstrumentedFuture {
        def now() = System.currentTimeMillis()
        def apply[T](name: String)(code: => T): Future[T] = {
           val start = now()
           val f = Future { 
            code 
           }
           f.onComplete {
              case _ => println(s"Future ${name} took ${now() - start} ms") 
           }
           f
        }
    }


    val future1 = InstrumentedFuture("Calculator") { /*...code...*/ }
    val future2 = InstrumentedFuture("Differentiator") { /*...code...*/ }