Akka - 处理阻塞操作
Akka - Handling Blocking Operations
我有一个使用 Scala、Akka 和 Spray 构建的应用程序。
在应用程序中,我有一个对外部 bash 脚本的阻塞调用。
它通过 sys.process 调用,平均需要 100 毫秒到 运行。
我一直在使用 Curl 和 Apache Bench 测试应用程序以获得一些性能指标。我还使用 nanoTime() 为 sys.process 调用自身计时。
如果我使用单个 Curl 调用进行测试,应用程序会按预期执行,sys.process 调用大约需要 100 毫秒。
当我通过使用 Apache Bench 处理多个并发请求来增加服务器负载时,sys.process 调用开始从 100 毫秒急剧增加到接近 1000 毫秒,具体取决于负载。
通过阅读 Scala 文档,我认为这里的问题是阻塞调用正在使用执行上下文中的所有可用线程,这会降低性能。
但是我怎样才能最好地满足这个需求呢?
我尝试创建自定义执行上下文并将调用包装在阻塞的未来...
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(100))
val future = Future {
blocking {
sys.process.Process(...),
new java.io.File("/var/...")).!!
}
}
然后我不得不等待未来使用 onComplete 完成,但这反过来似乎会阻塞线程,并且根据 Apache Bench 报告的每秒请求数,我看不到任何明显的性能提升。
以上代码在 Actor class 中的一个方法中,由 receive 方法调用。
任何人都可以建议我应该如何构造此代码以增加线程数并尽可能减少阻塞代码所花费的时间?
即在它完成时将操作和 return 交给它。
谢谢
我会异步响应以避免阻塞行为:
class MyActor extends Actor {
val exec = Executors.newFixedThreadPool(max_threads)
def recieve = {
case Request(file) => {
exec.submit(new Runnable() {
def run() {
sys.process.Process(...),
new java.io.File("/var/...")).!!
self ! Done(file) // dont block, respond asynchronously
}
})
}
case Done(file) => completeRequest(file)
}
}
无需为此创建新线程。
class MyActor extends Actor {
def recieve = {
case Request(file) => {
val process=sys.process.Process.....
self ! CheckProcess(process, file)
}
case CheckProcess(process, file)=>{
if(doUppkeep(process)){
self ! CheckProcess(process, file)
} else {
Done(file)
}
}
case Done(file) => completeRequest(file)
}
def doUpKeep(process):Boolean={
// Do wathever is needed to keep the external process running
// avoid looping since this pethod will be called again
// return true if process is still running.
}
}
如果需要,您可以安排 CheckProcess 消息,而不仅仅是将它们推送到消息队列。
self.context.system.scheduler.scheduleOnce(10 milliseconds, self, CheckProcess(process, file))
这将使进程表现得更好一些,但每个请求可能会变慢一些。
此模式将使用的资源保持在最低限度,同时确保所有进程都得到关注。
我有一个使用 Scala、Akka 和 Spray 构建的应用程序。 在应用程序中,我有一个对外部 bash 脚本的阻塞调用。 它通过 sys.process 调用,平均需要 100 毫秒到 运行。
我一直在使用 Curl 和 Apache Bench 测试应用程序以获得一些性能指标。我还使用 nanoTime() 为 sys.process 调用自身计时。
如果我使用单个 Curl 调用进行测试,应用程序会按预期执行,sys.process 调用大约需要 100 毫秒。
当我通过使用 Apache Bench 处理多个并发请求来增加服务器负载时,sys.process 调用开始从 100 毫秒急剧增加到接近 1000 毫秒,具体取决于负载。
通过阅读 Scala 文档,我认为这里的问题是阻塞调用正在使用执行上下文中的所有可用线程,这会降低性能。
但是我怎样才能最好地满足这个需求呢?
我尝试创建自定义执行上下文并将调用包装在阻塞的未来...
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(100))
val future = Future {
blocking {
sys.process.Process(...),
new java.io.File("/var/...")).!!
}
}
然后我不得不等待未来使用 onComplete 完成,但这反过来似乎会阻塞线程,并且根据 Apache Bench 报告的每秒请求数,我看不到任何明显的性能提升。
以上代码在 Actor class 中的一个方法中,由 receive 方法调用。
任何人都可以建议我应该如何构造此代码以增加线程数并尽可能减少阻塞代码所花费的时间? 即在它完成时将操作和 return 交给它。
谢谢
我会异步响应以避免阻塞行为:
class MyActor extends Actor {
val exec = Executors.newFixedThreadPool(max_threads)
def recieve = {
case Request(file) => {
exec.submit(new Runnable() {
def run() {
sys.process.Process(...),
new java.io.File("/var/...")).!!
self ! Done(file) // dont block, respond asynchronously
}
})
}
case Done(file) => completeRequest(file)
}
}
无需为此创建新线程。
class MyActor extends Actor {
def recieve = {
case Request(file) => {
val process=sys.process.Process.....
self ! CheckProcess(process, file)
}
case CheckProcess(process, file)=>{
if(doUppkeep(process)){
self ! CheckProcess(process, file)
} else {
Done(file)
}
}
case Done(file) => completeRequest(file)
}
def doUpKeep(process):Boolean={
// Do wathever is needed to keep the external process running
// avoid looping since this pethod will be called again
// return true if process is still running.
}
}
如果需要,您可以安排 CheckProcess 消息,而不仅仅是将它们推送到消息队列。
self.context.system.scheduler.scheduleOnce(10 milliseconds, self, CheckProcess(process, file))
这将使进程表现得更好一些,但每个请求可能会变慢一些。
此模式将使用的资源保持在最低限度,同时确保所有进程都得到关注。