如何在 Like 中升级 top-most 位主管?
How to escalate top-most supervisors in Akka?
我有以下top-level(“parent-most”)演员:
// Groovy pseudo-code
class Master extends UntypedActor {
ActorRef child1
ActorRef child2
ActorRef child3
ActorRef backup
@Override
void onReceive(Object message) throws Exception {
if(message instanceof Terminated) {
Terminated terminated = message as Terminated
if(terminated.actor != backup) {
terminated.actor = backup
} else {
// TODO: What to do here? How to escalate from here?
}
} else {
child1.tell(new DoSomething(message), getSelf())
child2.tell(new DoSomethingElse(message), getSelf())
child3.tell(new DoSomethingElser(message, getSelf())
}
}
@Override
SupervisorStrategy supervisorStrategy() {
new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
@Override
Directive apply(Throwable t) throws Exception {
if(isRecoverable(t) { // Don’t worry about how/where this is defined or how it works
SupervisorStrategy.stop()
} else {
SupervisorStrategy.escalate()
}
}
})
}
}
如您所见,它监督三个 children,当这 3 个 children 抛出“可恢复”异常时,它们将停止并被备份替换。到目前为止,还不错。
我现在面临的问题是,如果备用演员抛出任何 any throwable,我想考虑这个 Master
演员(实际上,我的应用程序通常)处于无法继续处理任何输入的状态,并将异常升级到 guardian-level.
我是 Akka 的新手,不确定将这段代码放在哪里,也不知道它应该是什么样子。同样,我只需要这样的逻辑:
- 如果备份演员抛出任何可抛出的异常,将异常升级到
Master
的 parent,这实际上应该是一个 Akka “guaradian” actor/construct
第一部分是我们需要知道什么时候从备份中抛出异常;我可以处理这部分,所以让我们假设我们的策略现在是这样的:
@Override
SupervisorStrategy supervisorStrategy() {
new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
@Override
Directive apply(Throwable t) throws Exception {
if(wasThrownFromBackup(t)) {
SupervisorStrategy.escalate()
} else if(isRecoverable(t) {
SupervisorStrategy.stop()
} else {
SupervisorStrategy.escalate()
}
}
})
}
但如您所见,我仍在努力实现“演员系统之外”的升级。想法? Java 代码示例非常 首选,因为 Scala 对我来说看起来像象形文字。
在这里查看 'Reaper' 模式 http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2 抱歉,它是在 Scala 中,但我认为它很容易转换为 Java。
也看看这里,https://groups.google.com/forum/#!topic/akka-user/QG_DL7FszMU
您应该在配置中设置
akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"
这将导致任何 'top level' 升级的参与者被系统停止。然后你实现另一个名为 'Reaper'(或任何你想称呼它)的顶级 actor,它只有一个工作,观察主要的顶级 actor 并在顶级 actor 时采取行动(例如 context.system.shutdown()
)停止。
我不知道 akka java API 所以无法为您提供一个确切的例子,但是在 Scala 中,从上面的 LetItCrash 博客来看,它看起来像:
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
class ProductionReaper extends Reaper {
// Shutdown
def allSoulsReaped(): Unit = context.system.shutdown()
}
在你的应用程序启动时,你创建了你的 master actor,创建了你的收割者,向收割者发送了一个 WatchMe(masterActor)
消息。
我有以下top-level(“parent-most”)演员:
// Groovy pseudo-code
class Master extends UntypedActor {
ActorRef child1
ActorRef child2
ActorRef child3
ActorRef backup
@Override
void onReceive(Object message) throws Exception {
if(message instanceof Terminated) {
Terminated terminated = message as Terminated
if(terminated.actor != backup) {
terminated.actor = backup
} else {
// TODO: What to do here? How to escalate from here?
}
} else {
child1.tell(new DoSomething(message), getSelf())
child2.tell(new DoSomethingElse(message), getSelf())
child3.tell(new DoSomethingElser(message, getSelf())
}
}
@Override
SupervisorStrategy supervisorStrategy() {
new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
@Override
Directive apply(Throwable t) throws Exception {
if(isRecoverable(t) { // Don’t worry about how/where this is defined or how it works
SupervisorStrategy.stop()
} else {
SupervisorStrategy.escalate()
}
}
})
}
}
如您所见,它监督三个 children,当这 3 个 children 抛出“可恢复”异常时,它们将停止并被备份替换。到目前为止,还不错。
我现在面临的问题是,如果备用演员抛出任何 any throwable,我想考虑这个 Master
演员(实际上,我的应用程序通常)处于无法继续处理任何输入的状态,并将异常升级到 guardian-level.
我是 Akka 的新手,不确定将这段代码放在哪里,也不知道它应该是什么样子。同样,我只需要这样的逻辑:
- 如果备份演员抛出任何可抛出的异常,将异常升级到
Master
的 parent,这实际上应该是一个 Akka “guaradian” actor/construct
第一部分是我们需要知道什么时候从备份中抛出异常;我可以处理这部分,所以让我们假设我们的策略现在是这样的:
@Override
SupervisorStrategy supervisorStrategy() {
new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
@Override
Directive apply(Throwable t) throws Exception {
if(wasThrownFromBackup(t)) {
SupervisorStrategy.escalate()
} else if(isRecoverable(t) {
SupervisorStrategy.stop()
} else {
SupervisorStrategy.escalate()
}
}
})
}
但如您所见,我仍在努力实现“演员系统之外”的升级。想法? Java 代码示例非常 首选,因为 Scala 对我来说看起来像象形文字。
在这里查看 'Reaper' 模式 http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2 抱歉,它是在 Scala 中,但我认为它很容易转换为 Java。
也看看这里,https://groups.google.com/forum/#!topic/akka-user/QG_DL7FszMU
您应该在配置中设置
akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"
这将导致任何 'top level' 升级的参与者被系统停止。然后你实现另一个名为 'Reaper'(或任何你想称呼它)的顶级 actor,它只有一个工作,观察主要的顶级 actor 并在顶级 actor 时采取行动(例如 context.system.shutdown()
)停止。
我不知道 akka java API 所以无法为您提供一个确切的例子,但是在 Scala 中,从上面的 LetItCrash 博客来看,它看起来像:
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
class ProductionReaper extends Reaper {
// Shutdown
def allSoulsReaped(): Unit = context.system.shutdown()
}
在你的应用程序启动时,你创建了你的 master actor,创建了你的收割者,向收割者发送了一个 WatchMe(masterActor)
消息。