Akka long-运行 从 Future 回调中初始化和崩溃 actor
Akka long-running initialization and crashing actor from within Future callback
我有一个角色负责与 Web 服务对话。我为了与服务对话,演员需要获取授权令牌。令牌会定期过期,因此参与者还需要定期检查令牌是否已过期并获取新令牌。
我的问题:
执行一些可能在构造函数中失败的长期 运行 任务的公认方法是什么?在我的示例中,我的 actor 在成功从远程服务接收到令牌之前不能被视为 "ready"。这可能会失败,应该重试。
我应该如何处理服务无法访问的情况,阻止我获取新令牌。我想抛出一个由我的主管策略处理的异常(某种节流重启策略)
这基本上就是我现在所拥有的:
case object CheckAuth
case object AuthFailed
case class UpdateAuth(token: Token)
var auth: Token = Await.result(authorize(), 2.seconds) // throws if server cannot be reached or i'm denied
val tick = scheduler.schedule(10.seconds, 10.seconds, self, CheckAuth)
override def postStop = tick.cancel()
override def receive: Receive = {
case AuthFailed => throw new Exception("Auth failed")
case UpdateAuth(a) => auth = a
case CheckAuth => {
if ( authHasExpired() ) {
authorize() onComplete {
case Success(r) => self ! UpdateAuth(r)
case Failure(e) => self ! AuthFailed // this feels dirty
}
}
}
}
对此建模的正确方法是让 Actor 具有两种状态,显式或隐式。这些之间的选择取决于客户端与该 Actor 的交互方式:
- Actor 在没有可用令牌时否定地回答请求,或者
- Actor 在没有令牌的情况下存储请求,并在可用时回答。
显式模型使用context.become()
:
class A extends Actor with Stash {
def auth(): Unit = {
authorize() pipeTo self
context.system.scheduler.scheduleOnce(2.seconds, self, AuthTimeout)
}
def update(token: AuthToken): Unit = {
context.become(running(token))
context.system.scheduler.scheduleOnce(10.seconds, self, CheckAuth)
}
auth()
def receive = initial
val initial: Receive = {
case UpdateAuth(token) =>
unstashAll()
update(token)
case AuthTimeout => context.stop(self)
case _ => stash()
}
def running(token: AuthToken): Receive = {
case CheckAuth => auth()
case AuthFailed => context.become(initial)
case UpdateAuth(token) => update(token)
...
}
}
隐式模型在 Actor 中存储一个 var token: Option[AuthToken]
,从 None 开始。
关于失败的处理:你可以做任何你想做的事,我建议停止 Actor 并让 supervisor 在给定时间后重新创建它——按照 supervisorStrategy 重启总是立即的。
我有一个角色负责与 Web 服务对话。我为了与服务对话,演员需要获取授权令牌。令牌会定期过期,因此参与者还需要定期检查令牌是否已过期并获取新令牌。
我的问题:
执行一些可能在构造函数中失败的长期 运行 任务的公认方法是什么?在我的示例中,我的 actor 在成功从远程服务接收到令牌之前不能被视为 "ready"。这可能会失败,应该重试。
我应该如何处理服务无法访问的情况,阻止我获取新令牌。我想抛出一个由我的主管策略处理的异常(某种节流重启策略)
这基本上就是我现在所拥有的:
case object CheckAuth
case object AuthFailed
case class UpdateAuth(token: Token)
var auth: Token = Await.result(authorize(), 2.seconds) // throws if server cannot be reached or i'm denied
val tick = scheduler.schedule(10.seconds, 10.seconds, self, CheckAuth)
override def postStop = tick.cancel()
override def receive: Receive = {
case AuthFailed => throw new Exception("Auth failed")
case UpdateAuth(a) => auth = a
case CheckAuth => {
if ( authHasExpired() ) {
authorize() onComplete {
case Success(r) => self ! UpdateAuth(r)
case Failure(e) => self ! AuthFailed // this feels dirty
}
}
}
}
对此建模的正确方法是让 Actor 具有两种状态,显式或隐式。这些之间的选择取决于客户端与该 Actor 的交互方式:
- Actor 在没有可用令牌时否定地回答请求,或者
- Actor 在没有令牌的情况下存储请求,并在可用时回答。
显式模型使用context.become()
:
class A extends Actor with Stash {
def auth(): Unit = {
authorize() pipeTo self
context.system.scheduler.scheduleOnce(2.seconds, self, AuthTimeout)
}
def update(token: AuthToken): Unit = {
context.become(running(token))
context.system.scheduler.scheduleOnce(10.seconds, self, CheckAuth)
}
auth()
def receive = initial
val initial: Receive = {
case UpdateAuth(token) =>
unstashAll()
update(token)
case AuthTimeout => context.stop(self)
case _ => stash()
}
def running(token: AuthToken): Receive = {
case CheckAuth => auth()
case AuthFailed => context.become(initial)
case UpdateAuth(token) => update(token)
...
}
}
隐式模型在 Actor 中存储一个 var token: Option[AuthToken]
,从 None 开始。
关于失败的处理:你可以做任何你想做的事,我建议停止 Actor 并让 supervisor 在给定时间后重新创建它——按照 supervisorStrategy 重启总是立即的。