Akka HTTP:将来阻塞会阻塞服务器

Akka HTTP: Blocking in a future blocks the server

我正在尝试使用 Akka HTTP 对我的请求进行基本验证。 碰巧我有一个外部资源可以通过它进行身份验证,所以我必须对该资源进行 rest 调用。

这需要一些时间,在处理过程中,我的 API 的其余部分似乎已被阻塞,等待此调用。 我用一个非常简单的例子重现了这个:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

如果我 post 到日志端点,我的获取端点也会卡住等待 5 秒,这是日志端点规定的。

这是预期的行为吗?如果是,我如何在不阻塞整个 API 的情况下进行阻塞操作?

奇怪,但对我来说一切正常(无阻塞)。这是代码:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future


object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

您还可以将异步代码包装到 onCompleteonSuccess 指令中:

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}

您观察到的是预期的行为 – 当然,它非常糟糕。好在存在已知的解决方案和最佳实践来防范它。在这个回答中,我想花一些时间来解释这个问题,简短的,详细的,然后是深入的——阅读愉快!

简短回答:“不要阻塞路由基础设施!”,始终使用专用调度程序来阻塞操作!

观察到的症状的原因: 问题是您正在使用 context.dispatcher 作为阻塞期货执行的调度程序。路由基础设施使用同一个调度程序(简单来说就是“线程束”)来实际处理传入的请求——因此,如果您阻塞所有可用线程,您最终会使路由基础设施挨饿。 (有待讨论和基准测试的是 Akka HTTP 是否可以防止这种情况发生,我会将其添加到我的研究中 todo-list)。

必须特别小心地处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们如此简单地将执行分离到不同的调度程序),如 Akka 文档部分所述:Blocking needs careful management.

我想在这里引起注意的另一件事是,如果可能的话,应该完全避免阻塞 API - 如果您的长 运行 操作不是真正的一个操作,但是其中的一系列,您可以将它们分成不同的演员,或者对未来进行排序。不管怎样,只是想指出——如果可能的话,避免这样的阻塞调用,但如果你不得不——那么下面解释了如何正确处理这些。

In-depth分析及解决方案:

现在我们从概念上知道问题出在哪里了,让我们看看上面的代码到底出了什么问题,以及这个问题的正确解决方案是怎样的:

颜色=线程状态:

  • 绿松石色 – 睡觉
  • 橙色 - 等待中
  • 绿色 - 可运行

现在让我们研究 3 段代码以及它们如何影响调度程序和应用程序的性能。为强制执行此行为,已将应用程序置于以下负载之下:

  • [a] 继续请求 GET 请求(请参阅上面初始问题中的代码),它不会在那里阻塞
  • [b] 然后在一段时间后触发 2000 个 POST 请求,这将导致返回 future 之前的 5 秒阻塞

1) [bad] 错误代码的调度程序行为:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

因此我们将我们的应用程序暴露给 [a] 负载,您可以看到许多 akka.actor.default-dispatcher 线程 - 它们正在处理请求 - 小的绿色片段和橙色意味着其他人实际上在那里闲置。

然后我们启动 [b] 加载,这会导致这些线程阻塞——您可以看到一个早期线程“default-dispatcher-2,3,4”在之前处于空闲状态后进入阻塞状态。我们还观察到池在增长——新线程启动“default-dispatcher-18,19,20,21...”但是它们立即进入睡眠状态(!)——我们在这里浪费宝贵的资源!

此类启动线程的数量取决于默认的调度程序配置,但可能不会超过 50 个左右。由于我们刚刚触发了 2k 个阻塞操作,我们使整个线程池处于饥饿状态——阻塞操作占主导地位,以至于路由基础设施没有可用线程来处理其他请求——非常糟糕!

让我们做点什么(顺便说一句,这是 Akka 的最佳实践——始终隔离阻塞行为,如下所示):

2) [good!] 调度员行为结构良好 code/dispatchers:

在您的 application.conf 中配置此专用于阻止行为的调度程序:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

您应该在 Akka Dispatchers 文档中阅读更多内容,以了解此处的各种选项。不过,要点是我们选择了一个 ThreadPoolExecutor ,它对线程数有硬性限制,它可用于阻塞操作。大小设置取决于您的应用程序做什么,以及您的服务器有多少核。

接下来我们需要使用它,而不是默认的:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

我们使用相同的负载对应用程序施加压力,首先是一些正常请求,然后我们添加阻塞请求。这就是线程池在这种情况下的行为方式:

所以最初正常的请求很容易被默认的调度程序处理,你可以在那里看到几条绿线——那是实际的执行(我并没有真正让服务器承受重负载,所以它大部分时间都是空闲的)。

现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-* 启动,并启动到配置的线程数。它处理那里的所有睡眠。此外,在这些线程上一段时间没有发生任何事情后,它会关闭它们。如果我们用另一堆阻塞来攻击服务器,池将启动新线程来处理 sleep()-ing 它们,bu与此同时——我们不会在“呆在那里什么都不做”上浪费我们宝贵的线索。

使用此设置时,正常 GET 请求的吞吐量没有受到影响,它们仍然愉快地在(仍然非常免费)默认调度程序上提供服务。

这是处理反应式应用程序中任何类型的阻塞的推荐方法。它通常被称为“隔离”(或“隔离”)应用程序的不良行为部分,在这种情况下不良行为是 sleeping/blocking.

3) [workaround-ish] blocking 正确应用时的调度程序行为:

在此示例中,我们使用 scaladoc for scala.concurrent.blocking 方法,该方法在遇到阻塞操作时可以提供帮助。它通常会导致启动更多线程以在阻塞操作中幸存下来。

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

该应用的行为如下:

您会注意到 很多 新线程被创建,这是因为阻塞提示“哦,这会阻塞,所以我们需要更多线程”。这导致我们被阻塞的总时间小于 1) 示例中的时间,但是在阻塞操作完成后我们有数百个线程什么都不做......当然,它们最终将被关闭(FJP 这样做),但有一段时间我们会有大量(不受控制的)线程 运行,与 2) 解决方案相反,在解决方案中我们确切知道有多少线程用于阻塞行为。

总结:永远不要阻止默认调度程序:-)

最佳做法是使用 2) 中显示的模式,为可用的阻塞操作提供调度程序,并在那里执行它们。

讨论 Akka HTTP 版本2.0.1

使用的探查器: 很多人私下问我在回答这个问题时我用什么探查器来可视化上面图片中的线程状态,所以在这里添加这个信息:我使用 YourKit which is an awesome commercial profiler (free for OSS), though you can achieve the same results using the free VisualVM from OpenJDK.