Spray.io: 什么时候(不)使用非阻塞路由处理?

Spray.io: When (not) to use non-blocking route handling?

如果我们正在考虑生产级 REST API,我们是否应该尽可能使用非阻塞,例如

def insertDbAsync(rows: RowList): Future[Unit] = ...
...
val route =
path("database" / "insertRowList") {
  post {
    entity(as[RowList]) { rows =>
      log.info(s"${rows.length} rows received")
      val async = insertDbAsync(rows)
      onComplete(async) {
        case Success(response) =>
          complete("success")
        case Failure(t) =>
          complete("error")
      }
    }
  }
}

我认为答案很可能是 'yes',但是在决定什么应该和不应该是阻塞代码时有哪些指导方针,为什么?

如果您正在使用 spray,就正确性而言,一切都必须是非阻塞的 - 否则您将阻塞(极少数)调度线程并且您的服务器将停止响应。

Spray 使用 Akka 作为底层平台,因此推荐与 actors (Blocking Needs Careful Management) 相同。阻塞代码可能需要太多线程,这可能:

  • kill actor的轻量级:默认情况下,数百万个actor可能在一个线程上运行。例如,假设一个非阻塞 actor 需要 0.001 个线程。一个阻塞的 actor(阻塞时间比平时多 100 倍)将平均占用 1 个线程(并不总是相同的线程)。首先,你拥有的线程越多 - 你释放的内存越多 - 每个阻塞的线程都持有阻塞前分配的完整调用堆栈,包括来自堆栈的引用(因此 GC 无法擦除它们)。其次,如果您有超过 number_of_processors 个线程 - 您将失去性能。第三,如果您使用一些动态池 - 添加新线程可能会花费大量时间。

  • 导致线程饥饿 - 你可能有线程填充池,线程什么也不做 - 所以在阻塞操作完成之前无法处理新任务(0 % CPU 加载,但 100500等待处理的消息)。甚至可能造成死锁。但是,Akka 默认使用 Fork-Join-Pool 所以如果你的阻塞代码被管理(用 scala.concurrent.blocking 包围 - Await.result 里面有这样的包围) - 它会通过创建新线程而不是成本来防止饥饿屏蔽了一个,但不会补偿其他问题

  • 传统上会导致死锁,因此不利于设计

如果代码被外部阻塞,你可以用 future 包围它:

 import scala.concurrent._
 val f = Future {
     someNonBlockingCode()
     blocking { //mark this thread as "blocked" so fork-join-pool may create another one to compensate
        someBlocking()
     }  
 }

内部独立演员:

 f pipeTo sender //will send the result to `sender` actor

内喷路由:

 onComplete(f) { .. }

最好在单独的 pool/dispatcher(基于 fork-join-pool)中执行此类期货。

P.S。作为 futures 的替代品(从设计的角度来看它们可能不太方便),您可以考虑 Akka I/O, Continuations/Coroutines, Actor's pools(也在单独的调度程序中)、Disruptor 等