为什么我键入的 actor 没有被其监护人重新启动?
Why isn't my typed actor being restarted by its guardian?
我正在试验 Akka 类型。我有一个虚拟 actor 模拟一个 flaky worker:
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import scala.util.Random
object DummyActor {
def behavior[T](serviceKey: ServiceKey[T]): Behavior[Any] = Behaviors.setup { ctx =>
ctx.system.receptionist ! Receptionist.Register(serviceKey, ctx.self)
ctx.log.info("Woohoo, I'm alive!")
Thread.sleep(1000)
if (Random.nextInt(10) > 5)
throw new IllegalStateException("Something bad happened!")
Behaviors.empty
}
}
及其带路由器的监护人:
import akka.actor.typed.{Behavior, SupervisorStrategy}
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
object MyCluster {
val serviceKey: ServiceKey[String] = ServiceKey[String]("cluster")
val strategy = SupervisorStrategy.restart
val behavior: Behavior[String] = Behaviors.setup { ctx =>
(1 to 5).foreach { i =>
ctx.log.info(s"Spawning actor #$i")
ctx.spawn(
Behaviors.supervise(DummyActor.behavior(serviceKey))
.onFailure[Throwable](strategy),
s"actor-$i"
)
}
val router = ctx.spawn(RandomRouter.clusterRouter(serviceKey), "router")
Behaviors.empty
}
}
我的路由器侦听接待员事件:
import java.util.concurrent.ThreadLocalRandom
import akka.actor.Address
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.ClusterEvent.{ReachabilityEvent, ReachableMember, UnreachableMember}
import akka.cluster.typed.{Cluster, Subscribe}
object RandomRouter {
private final case class WrappedReachabilityEvent(event: ReachabilityEvent)
// same as above, but also subscribes to cluster reachability events and
// avoids routees that are unreachable
def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
Behaviors.setup[Any] { ctx ⇒
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
val cluster = Cluster(ctx.system)
// typically you have to map such external messages into this
// actor's protocol with a message adapter
val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply)
cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =
Behaviors.receive { (ctx, msg) ⇒
msg match {
case serviceKey.Listing(services) ⇒
if (services.isEmpty) {
ctx.log.info("Found no services")
} else {
ctx.log.info(s"Found services: ${services.map(_.path).mkString(", ")}")
}
routingBehavior(services.toVector, unreachable)
case WrappedReachabilityEvent(event) => event match {
case UnreachableMember(m) =>
ctx.log.warning(s"Member ${m.uniqueAddress.address} has become unreachable")
routingBehavior(routees, unreachable + m.address)
case ReachableMember(m) =>
ctx.log.info(s"Member ${m.uniqueAddress.address} has become reachable again")
routingBehavior(routees, unreachable - m.address)
}
case other: T @unchecked ⇒
if (routees.isEmpty)
Behaviors.unhandled
else {
val reachableRoutes =
if (unreachable.isEmpty) routees
else routees.filterNot { r => unreachable(r.path.address) }
val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size)
reachableRoutes(i) ! other
Behaviors.same
}
}
}
routingBehavior(Vector.empty, Set.empty)
}.narrow[T]
}
但我发现当我启动我的集群时,最终我会发现一些演员已经死了(预期),但没有重新启动,给我留下这样的日志:
[INFO] [06/01/2018 18:11:14.242] [cluster-system-akka.actor.default-dispatcher-4] [akka://cluster-system/user/router] Found services: akka://cluster-system/user/actor-4, akka://cluster-system/user/actor-3, akka://cluster-system/user/actor-1, akka://cluster-system/user/actor-5
为什么 MyCluster#strategy
不重新启动失败的演员?
SupervisorStrategy#restart 的代码和相关注释包含答案。
如果在 actor setup 期间出现异常,重新启动将很危险,因为可能会创建无限重启循环。相反,建议使用 restartWithBackoff
进行启动监督。
我正在试验 Akka 类型。我有一个虚拟 actor 模拟一个 flaky worker:
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import scala.util.Random
object DummyActor {
def behavior[T](serviceKey: ServiceKey[T]): Behavior[Any] = Behaviors.setup { ctx =>
ctx.system.receptionist ! Receptionist.Register(serviceKey, ctx.self)
ctx.log.info("Woohoo, I'm alive!")
Thread.sleep(1000)
if (Random.nextInt(10) > 5)
throw new IllegalStateException("Something bad happened!")
Behaviors.empty
}
}
及其带路由器的监护人:
import akka.actor.typed.{Behavior, SupervisorStrategy}
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
object MyCluster {
val serviceKey: ServiceKey[String] = ServiceKey[String]("cluster")
val strategy = SupervisorStrategy.restart
val behavior: Behavior[String] = Behaviors.setup { ctx =>
(1 to 5).foreach { i =>
ctx.log.info(s"Spawning actor #$i")
ctx.spawn(
Behaviors.supervise(DummyActor.behavior(serviceKey))
.onFailure[Throwable](strategy),
s"actor-$i"
)
}
val router = ctx.spawn(RandomRouter.clusterRouter(serviceKey), "router")
Behaviors.empty
}
}
我的路由器侦听接待员事件:
import java.util.concurrent.ThreadLocalRandom
import akka.actor.Address
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.ClusterEvent.{ReachabilityEvent, ReachableMember, UnreachableMember}
import akka.cluster.typed.{Cluster, Subscribe}
object RandomRouter {
private final case class WrappedReachabilityEvent(event: ReachabilityEvent)
// same as above, but also subscribes to cluster reachability events and
// avoids routees that are unreachable
def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
Behaviors.setup[Any] { ctx ⇒
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
val cluster = Cluster(ctx.system)
// typically you have to map such external messages into this
// actor's protocol with a message adapter
val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply)
cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =
Behaviors.receive { (ctx, msg) ⇒
msg match {
case serviceKey.Listing(services) ⇒
if (services.isEmpty) {
ctx.log.info("Found no services")
} else {
ctx.log.info(s"Found services: ${services.map(_.path).mkString(", ")}")
}
routingBehavior(services.toVector, unreachable)
case WrappedReachabilityEvent(event) => event match {
case UnreachableMember(m) =>
ctx.log.warning(s"Member ${m.uniqueAddress.address} has become unreachable")
routingBehavior(routees, unreachable + m.address)
case ReachableMember(m) =>
ctx.log.info(s"Member ${m.uniqueAddress.address} has become reachable again")
routingBehavior(routees, unreachable - m.address)
}
case other: T @unchecked ⇒
if (routees.isEmpty)
Behaviors.unhandled
else {
val reachableRoutes =
if (unreachable.isEmpty) routees
else routees.filterNot { r => unreachable(r.path.address) }
val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size)
reachableRoutes(i) ! other
Behaviors.same
}
}
}
routingBehavior(Vector.empty, Set.empty)
}.narrow[T]
}
但我发现当我启动我的集群时,最终我会发现一些演员已经死了(预期),但没有重新启动,给我留下这样的日志:
[INFO] [06/01/2018 18:11:14.242] [cluster-system-akka.actor.default-dispatcher-4] [akka://cluster-system/user/router] Found services: akka://cluster-system/user/actor-4, akka://cluster-system/user/actor-3, akka://cluster-system/user/actor-1, akka://cluster-system/user/actor-5
为什么 MyCluster#strategy
不重新启动失败的演员?
SupervisorStrategy#restart 的代码和相关注释包含答案。
如果在 actor setup 期间出现异常,重新启动将很危险,因为可能会创建无限重启循环。相反,建议使用 restartWithBackoff
进行启动监督。