在失败的情况下从询问中解决 Akka 期货
Resolving Akka futures from ask in the event of a failure
我在 Spray 应用程序中使用询问模式调用 Actor,并将结果作为 HTTP 响应返回。我将参与者的故障映射到自定义错误代码。
val authActor = context.actorOf(Props[AuthenticationActor])
callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
complete(StatusCodes.OK, user)
}
def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
onComplete(f) {
case Success(value: T) => cb(value)
case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
//In reality this returns a custom error object.
}
}
这在 authActor 发送失败时正常工作,但如果 authActor 抛出异常,则在请求超时完成之前不会发生任何事情。例如:
override def receive: Receive = {
case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}
我知道 Akka 文档是这么说的
To complete the future with an exception you need send a Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.
但考虑到我在 Spray 路由 actor 和服务 actor 之间使用了很多接口,我宁愿不使用 try/catch 包装每个子 actor 的接收部分。有没有更好的方法实现子actor中异常的自动处理,一旦出现异常立即解决未来?
编辑:这是我目前的解决方案。但是,给每个童星都这么搞,还是挺麻烦的。
override def receive: Receive = {
case default =>
try {
default match {
case _ => throw new ServiceException("")//Actual code would go here
}
}
catch {
case se: ServiceException =>
logger.error("Service error raised:", se)
sender ! Failure(se)
case ex: Exception =>
sender ! Failure(ex)
throw ex
}
}
这样,如果是预期错误(即 ServiceException),将通过创建失败来处理。如果出乎意料,它会立即 returns 失败,以便将来解决,但随后会抛出异常,因此它仍然可以由 SupervisorStrategy 处理。
如果您想要一种在发生意外异常时自动将响应发送回发件人的方法,那么类似这样的方法可能适合您:
trait FailurePropatingActor extends Actor{
override def preRestart(reason:Throwable, message:Option[Any]){
super.preRestart(reason, message)
sender() ! Status.Failure(reason)
}
}
我们覆盖 preRestart
并将失败作为 Status.Failure
传播回发送方,这将导致上游 Future
失败。另外,在这里调用 super.preRestart
很重要,因为那是 child 停止的地方。在演员中使用它看起来像这样:
case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {
def receive = {
case GetElement(list, i) =>
val result = list(i)
sender() ! result
}
}
如果我像这样调用这个 actor 的实例:
import akka.pattern.ask
import concurrent.duration._
val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)
fut onComplete{
case util.Success(result) =>
println(s"success: $result")
case util.Failure(ex) =>
println(s"FAIL: ${ex.getMessage}")
ex.printStackTrace()
}
然后它会正确地击中我的 Failure
块。现在,当 Futures
不参与扩展该特征的参与者(如此处的简单参与者)时,该基本特征中的代码运行良好。但是如果你使用 Future
s 那么你需要小心,因为 Future
中发生的异常不会导致 actor 重启,而且在 preRestart
中,调用 sender()
不会 return 正确的引用,因为演员已经进入下一条消息。像这样的演员表明了这个问题:
class MyBadFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
def receive = {
case GetElement(list, i) =>
val orig = sender()
val fut = Future{
val result = list(i)
orig ! result
}
}
}
如果我们在之前的测试代码中使用这个actor,我们总是会在失败的情况下超时。为了缓解这种情况,您需要像这样将期货结果通过管道返回给发送者:
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe
def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut pipeTo sender()
}
}
在这种特殊情况下,actor 本身不会重新启动,因为它没有遇到未捕获的异常。现在,如果你的 actor 需要在 future 之后做一些额外的处理,你可以通过管道返回 self 并在你得到 Status.Failure
:
时显式失败
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe
def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut.to(self, sender())
case d:Double =>
sender() ! d * 2
case Status.Failure(ex) =>
throw ex
}
}
如果这种行为变得普遍,您可以像这样将其提供给任何需要它的参与者:
trait StatusFailureHandling{ me:Actor =>
def failureHandling:Receive = {
case Status.Failure(ex) =>
throw ex
}
}
class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
import context.dispatcher
import akka.pattern.pipe
def receive = myReceive orElse failureHandling
def myReceive:Receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut.to(self, sender())
case d:Double =>
sender() ! d * 2
}
}
我在 Spray 应用程序中使用询问模式调用 Actor,并将结果作为 HTTP 响应返回。我将参与者的故障映射到自定义错误代码。
val authActor = context.actorOf(Props[AuthenticationActor])
callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
complete(StatusCodes.OK, user)
}
def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
onComplete(f) {
case Success(value: T) => cb(value)
case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
//In reality this returns a custom error object.
}
}
这在 authActor 发送失败时正常工作,但如果 authActor 抛出异常,则在请求超时完成之前不会发生任何事情。例如:
override def receive: Receive = {
case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}
我知道 Akka 文档是这么说的
To complete the future with an exception you need send a Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.
但考虑到我在 Spray 路由 actor 和服务 actor 之间使用了很多接口,我宁愿不使用 try/catch 包装每个子 actor 的接收部分。有没有更好的方法实现子actor中异常的自动处理,一旦出现异常立即解决未来?
编辑:这是我目前的解决方案。但是,给每个童星都这么搞,还是挺麻烦的。
override def receive: Receive = {
case default =>
try {
default match {
case _ => throw new ServiceException("")//Actual code would go here
}
}
catch {
case se: ServiceException =>
logger.error("Service error raised:", se)
sender ! Failure(se)
case ex: Exception =>
sender ! Failure(ex)
throw ex
}
}
这样,如果是预期错误(即 ServiceException),将通过创建失败来处理。如果出乎意料,它会立即 returns 失败,以便将来解决,但随后会抛出异常,因此它仍然可以由 SupervisorStrategy 处理。
如果您想要一种在发生意外异常时自动将响应发送回发件人的方法,那么类似这样的方法可能适合您:
trait FailurePropatingActor extends Actor{
override def preRestart(reason:Throwable, message:Option[Any]){
super.preRestart(reason, message)
sender() ! Status.Failure(reason)
}
}
我们覆盖 preRestart
并将失败作为 Status.Failure
传播回发送方,这将导致上游 Future
失败。另外,在这里调用 super.preRestart
很重要,因为那是 child 停止的地方。在演员中使用它看起来像这样:
case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {
def receive = {
case GetElement(list, i) =>
val result = list(i)
sender() ! result
}
}
如果我像这样调用这个 actor 的实例:
import akka.pattern.ask
import concurrent.duration._
val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)
fut onComplete{
case util.Success(result) =>
println(s"success: $result")
case util.Failure(ex) =>
println(s"FAIL: ${ex.getMessage}")
ex.printStackTrace()
}
然后它会正确地击中我的 Failure
块。现在,当 Futures
不参与扩展该特征的参与者(如此处的简单参与者)时,该基本特征中的代码运行良好。但是如果你使用 Future
s 那么你需要小心,因为 Future
中发生的异常不会导致 actor 重启,而且在 preRestart
中,调用 sender()
不会 return 正确的引用,因为演员已经进入下一条消息。像这样的演员表明了这个问题:
class MyBadFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
def receive = {
case GetElement(list, i) =>
val orig = sender()
val fut = Future{
val result = list(i)
orig ! result
}
}
}
如果我们在之前的测试代码中使用这个actor,我们总是会在失败的情况下超时。为了缓解这种情况,您需要像这样将期货结果通过管道返回给发送者:
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe
def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut pipeTo sender()
}
}
在这种特殊情况下,actor 本身不会重新启动,因为它没有遇到未捕获的异常。现在,如果你的 actor 需要在 future 之后做一些额外的处理,你可以通过管道返回 self 并在你得到 Status.Failure
:
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe
def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut.to(self, sender())
case d:Double =>
sender() ! d * 2
case Status.Failure(ex) =>
throw ex
}
}
如果这种行为变得普遍,您可以像这样将其提供给任何需要它的参与者:
trait StatusFailureHandling{ me:Actor =>
def failureHandling:Receive = {
case Status.Failure(ex) =>
throw ex
}
}
class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
import context.dispatcher
import akka.pattern.pipe
def receive = myReceive orElse failureHandling
def myReceive:Receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}
fut.to(self, sender())
case d:Double =>
sender() ! d * 2
}
}