Spray.io: 什么时候(不)使用非阻塞路由处理?
Spray.io: When (not) to use non-blocking route handling?
如果我们正在考虑生产级 REST API,我们是否应该尽可能使用非阻塞,例如
def insertDbAsync(rows: RowList): Future[Unit] = ...
...
val route =
path("database" / "insertRowList") {
post {
entity(as[RowList]) { rows =>
log.info(s"${rows.length} rows received")
val async = insertDbAsync(rows)
onComplete(async) {
case Success(response) =>
complete("success")
case Failure(t) =>
complete("error")
}
}
}
}
我认为答案很可能是 'yes',但是在决定什么应该和不应该是阻塞代码时有哪些指导方针,为什么?
如果您正在使用 spray,就正确性而言,一切都必须是非阻塞的 - 否则您将阻塞(极少数)调度线程并且您的服务器将停止响应。
Spray 使用 Akka 作为底层平台,因此推荐与 actors (Blocking Needs Careful Management) 相同。阻塞代码可能需要太多线程,这可能:
kill actor的轻量级:默认情况下,数百万个actor可能在一个线程上运行。例如,假设一个非阻塞 actor 需要 0.001 个线程。一个阻塞的 actor(阻塞时间比平时多 100 倍)将平均占用 1 个线程(并不总是相同的线程)。首先,你拥有的线程越多 - 你释放的内存越多 - 每个阻塞的线程都持有阻塞前分配的完整调用堆栈,包括来自堆栈的引用(因此 GC 无法擦除它们)。其次,如果您有超过 number_of_processors
个线程 - 您将失去性能。第三,如果您使用一些动态池 - 添加新线程可能会花费大量时间。
导致线程饥饿 - 你可能有线程填充池,线程什么也不做 - 所以在阻塞操作完成之前无法处理新任务(0 % CPU 加载,但 100500等待处理的消息)。甚至可能造成死锁。但是,Akka 默认使用 Fork-Join-Pool 所以如果你的阻塞代码被管理(用 scala.concurrent.blocking
包围 - Await.result
里面有这样的包围) - 它会通过创建新线程而不是成本来防止饥饿屏蔽了一个,但不会补偿其他问题
传统上会导致死锁,因此不利于设计
如果代码被外部阻塞,你可以用 future 包围它:
import scala.concurrent._
val f = Future {
someNonBlockingCode()
blocking { //mark this thread as "blocked" so fork-join-pool may create another one to compensate
someBlocking()
}
}
内部独立演员:
f pipeTo sender //will send the result to `sender` actor
内喷路由:
onComplete(f) { .. }
最好在单独的 pool/dispatcher(基于 fork-join-pool)中执行此类期货。
P.S。作为 futures 的替代品(从设计的角度来看它们可能不太方便),您可以考虑 Akka I/O, Continuations/Coroutines, Actor's pools(也在单独的调度程序中)、Disruptor 等
如果我们正在考虑生产级 REST API,我们是否应该尽可能使用非阻塞,例如
def insertDbAsync(rows: RowList): Future[Unit] = ...
...
val route =
path("database" / "insertRowList") {
post {
entity(as[RowList]) { rows =>
log.info(s"${rows.length} rows received")
val async = insertDbAsync(rows)
onComplete(async) {
case Success(response) =>
complete("success")
case Failure(t) =>
complete("error")
}
}
}
}
我认为答案很可能是 'yes',但是在决定什么应该和不应该是阻塞代码时有哪些指导方针,为什么?
如果您正在使用 spray,就正确性而言,一切都必须是非阻塞的 - 否则您将阻塞(极少数)调度线程并且您的服务器将停止响应。
Spray 使用 Akka 作为底层平台,因此推荐与 actors (Blocking Needs Careful Management) 相同。阻塞代码可能需要太多线程,这可能:
kill actor的轻量级:默认情况下,数百万个actor可能在一个线程上运行。例如,假设一个非阻塞 actor 需要 0.001 个线程。一个阻塞的 actor(阻塞时间比平时多 100 倍)将平均占用 1 个线程(并不总是相同的线程)。首先,你拥有的线程越多 - 你释放的内存越多 - 每个阻塞的线程都持有阻塞前分配的完整调用堆栈,包括来自堆栈的引用(因此 GC 无法擦除它们)。其次,如果您有超过
number_of_processors
个线程 - 您将失去性能。第三,如果您使用一些动态池 - 添加新线程可能会花费大量时间。导致线程饥饿 - 你可能有线程填充池,线程什么也不做 - 所以在阻塞操作完成之前无法处理新任务(0 % CPU 加载,但 100500等待处理的消息)。甚至可能造成死锁。但是,Akka 默认使用 Fork-Join-Pool 所以如果你的阻塞代码被管理(用
scala.concurrent.blocking
包围 -Await.result
里面有这样的包围) - 它会通过创建新线程而不是成本来防止饥饿屏蔽了一个,但不会补偿其他问题传统上会导致死锁,因此不利于设计
如果代码被外部阻塞,你可以用 future 包围它:
import scala.concurrent._
val f = Future {
someNonBlockingCode()
blocking { //mark this thread as "blocked" so fork-join-pool may create another one to compensate
someBlocking()
}
}
内部独立演员:
f pipeTo sender //will send the result to `sender` actor
内喷路由:
onComplete(f) { .. }
最好在单独的 pool/dispatcher(基于 fork-join-pool)中执行此类期货。
P.S。作为 futures 的替代品(从设计的角度来看它们可能不太方便),您可以考虑 Akka I/O, Continuations/Coroutines, Actor's pools(也在单独的调度程序中)、Disruptor 等