如何确定 Akka actor/supervisor 层次结构?
How to determine Akka actor/supervisor hierarchy?
我是 Akka 的新手(Java lib,v2.3.9)。我正在尝试遵循 supervisor hierarchy best practices,但由于这是我的第一个 Akka 应用程序,我在某处遇到了心理障碍。
在我的第一个 Akka 应用程序(实际上是一个旨在跨多个应用程序重用的库)中,来自外部世界的输入表现为传递给参与者的 Process
消息。使用我的应用程序的开发人员将提供一个基于文本的配置文件,最终配置哪些 actor 被发送 Process
个实例,哪些不被发送。换句话说,假设这些是我的演员 类:
// Groovy pseudo-code
class Process {
private final Input input
Process(Input input) {
super()
this.input = deepClone(input)
}
Input getInput() {
deepClone(this.input)
}
}
class StormTrooper extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like a Storm Trooper would.
}
}
}
class DarthVader extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like Darth Vader would.
}
}
}
class Emperor extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like the Emperor would.
}
}
}
// myapp-config.json -> where the actors are configured, along with other
// app-specific configs
{
"fizzbuzz": "true",
"isYosemite": "false",
"borderColor": "red",
"processors": [
"StormTrooper",
"Emperor"
]
}
如您在配置文件中所见,仅选择 StormTrooper
和 Emperor
来接收 Process
消息。这最终导致创建零 (0) DarthVader
个参与者。这也是我的意图,这将导致 Set<ActorRef>
可用于填充有 StormTrooper
和 Emperor
的应用程序,如下所示:
class SomeApp {
SomeAppConfig config
static void main(String[] args) {
String configFileUrl = args[0] // Nevermind this horrible code
// Pretend here that configFileUrl is a valid path to
// myapp-config.json.
SomeApp app = new SomeApp(configFileUrl)
app.run()
}
SomeApp(String url) {
super()
config = new SomeAppConfig(url)
}
void run() {
// Since the config file only specifies StormTrooper and
// Emperor as viable processors, the set only contains instances of
// these ActorRef types.
Set<ActorRef> processors = config.loadProcessors()
ActorSystem actorSystem = config.getActorSystem()
while(true) {
Input input = scanForInput()
Process process = new Process(input)
// Notify each config-driven processor about the
// new input we've received that they need to process.
processors.each {
it.tell(process, Props.self()) // This isn't correct btw
}
}
}
}
因此,正如您(希望)看到的那样,我们拥有所有这些处理 Process
消息(反过来,捕获 Input
来自某些来源)。至于哪些Actor甚至alive/online来处理这些Process
消息完全是配置驱动的。最后,每次应用程序收到 Input
时,它都会被注入到 Process
消息中,并且 Process
消息会发送给所有 configured/living 参与者。
有了这个作为给定的 backstory/setup,我无法确定 "actor/supervisor hierarchy" 需要什么。在我的用例中,似乎所有参与者都是真正平等的,他们之间没有监督结构。如果该类型的 actor 被配置为存在,StormTrooper
只会收到一条 Process
消息。与其他演员 sub类.
相同
我是不是完全漏掉了什么?如果所有参与者都是平等的并且层次结构本质上是 "flat"/水平的,我该如何定义监督层次结构(用于容错目的)?
如果您想为每个演员实例化不超过一个实例 - 您可能需要 SenatorPalpatine
来监督这三个实例。如果您可能有多个 StormTrooper
- 您可能希望 JangoFett
演员负责创建(并可能杀死)他们,一些 router is also good option (it will supervise them automatically). This will also give you an ability to restart all troopers if one fails (OneForAllStrategy
),广播能力,持有一些常用统计等
使用路由器的示例 (pseudo-Scala):
//application.conf
akka.actor.deployment {
/palpatine/vader {
router = broadcast-pool
nr-of-instances = 1
}
/palpatine/troopers {
router = broadcast-pool
nr-of-instances = 10
}
}
class Palpatine extends Actor {
import context._
val troopers = actorOf(FromConfig.props(Props[Trooper],
"troopers").withSupervisorStrategy(strategy) //`strategy` is strategy for troopers
val vader = actorOf(FromConfig.props(Props[Vader]), "vader")
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1) //stategy for Palpatine's children (routers itself)
val strategy = OneForOneStrategy(maxNrOfRetries = 100, withinTimeRange = 1) //stategy for troopers
def receive = {
case p@Process => troopers ! p; vader ! p
case t@Terminted => println(t)
}
}
根据标准 akka-config 创建广播池。我还展示了你可以为他们单独定制监督策略。
如果您希望某些 actor 出于某种原因忽略消息 - 只需在 actor 中实现此逻辑,例如:
class Vader extends Actor {
def receive {
case p@Process => ...
case Ignore => context.become(ignore) //changes message handler to `ignore`
}
def ignore = {
case x => println("Ignored message " + x)
case UnIgnore => context.become(process)//changes message handler back
}
}
这将动态配置 ignore/unignore(否则它只是一个简单的 if
)。您可以根据某些配置向演员发送 Ignore
消息:
val listOfIgnorantPathes = readFromSomeConfig()
context.actorSelection(listOfIgnoredPathes) ! Ignore
如果你想从配置中控制异构广播,你也可以像 trooper 的路由器一样为 palpatine 创建广播器(只需使用组而不是池):
akka.actor.deployment {
... //vader, troopers configuration
/palpatine/broadcaster {
router = broadcast-group
routees.paths = ["/palpatine/vader", "/palpatine/troopers"]
}
}
class Palpatine extends Actor {
... //vader, troopers definitions
val broadcaster = actorOf(FromConfig.props(), "broadcaster")
def receive = {
case p@Process => broadcaster ! p
}
}
将 vader 从 routees.paths
中排除,使他不会收到 Process
消息。
P.S。 Actor 永远不会孤单 - 总是有 Guardian Actor(参见 The Top-Level Supervisors),它会在出现异常时关闭整个系统。所以无论哪种方式 SenatorPalpatine
都可能真的成为你的救星。
P.S.2 context.actorSelection("palpatine/*")
实际上允许您向所有 children 发送消息(作为广播池和组的替代),因此您不需要里面有一套。
基于 ,您仍然需要 Master
演员来复制和分发 Process
es。从概念上讲,您不会让用户(或任何正在生成您的输入的东西)为每个演员提供一次相同的输入。他们只会提供一次消息,然后您(或 Master
演员)会根据需要复制该消息并将其发送给每个适当的子演员。
正如 中所讨论的,这种方法具有增加容错能力的额外好处。
我是 Akka 的新手(Java lib,v2.3.9)。我正在尝试遵循 supervisor hierarchy best practices,但由于这是我的第一个 Akka 应用程序,我在某处遇到了心理障碍。
在我的第一个 Akka 应用程序(实际上是一个旨在跨多个应用程序重用的库)中,来自外部世界的输入表现为传递给参与者的 Process
消息。使用我的应用程序的开发人员将提供一个基于文本的配置文件,最终配置哪些 actor 被发送 Process
个实例,哪些不被发送。换句话说,假设这些是我的演员 类:
// Groovy pseudo-code
class Process {
private final Input input
Process(Input input) {
super()
this.input = deepClone(input)
}
Input getInput() {
deepClone(this.input)
}
}
class StormTrooper extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like a Storm Trooper would.
}
}
}
class DarthVader extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like Darth Vader would.
}
}
}
class Emperor extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like the Emperor would.
}
}
}
// myapp-config.json -> where the actors are configured, along with other
// app-specific configs
{
"fizzbuzz": "true",
"isYosemite": "false",
"borderColor": "red",
"processors": [
"StormTrooper",
"Emperor"
]
}
如您在配置文件中所见,仅选择 StormTrooper
和 Emperor
来接收 Process
消息。这最终导致创建零 (0) DarthVader
个参与者。这也是我的意图,这将导致 Set<ActorRef>
可用于填充有 StormTrooper
和 Emperor
的应用程序,如下所示:
class SomeApp {
SomeAppConfig config
static void main(String[] args) {
String configFileUrl = args[0] // Nevermind this horrible code
// Pretend here that configFileUrl is a valid path to
// myapp-config.json.
SomeApp app = new SomeApp(configFileUrl)
app.run()
}
SomeApp(String url) {
super()
config = new SomeAppConfig(url)
}
void run() {
// Since the config file only specifies StormTrooper and
// Emperor as viable processors, the set only contains instances of
// these ActorRef types.
Set<ActorRef> processors = config.loadProcessors()
ActorSystem actorSystem = config.getActorSystem()
while(true) {
Input input = scanForInput()
Process process = new Process(input)
// Notify each config-driven processor about the
// new input we've received that they need to process.
processors.each {
it.tell(process, Props.self()) // This isn't correct btw
}
}
}
}
因此,正如您(希望)看到的那样,我们拥有所有这些处理 Process
消息(反过来,捕获 Input
来自某些来源)。至于哪些Actor甚至alive/online来处理这些Process
消息完全是配置驱动的。最后,每次应用程序收到 Input
时,它都会被注入到 Process
消息中,并且 Process
消息会发送给所有 configured/living 参与者。
有了这个作为给定的 backstory/setup,我无法确定 "actor/supervisor hierarchy" 需要什么。在我的用例中,似乎所有参与者都是真正平等的,他们之间没有监督结构。如果该类型的 actor 被配置为存在,StormTrooper
只会收到一条 Process
消息。与其他演员 sub类.
我是不是完全漏掉了什么?如果所有参与者都是平等的并且层次结构本质上是 "flat"/水平的,我该如何定义监督层次结构(用于容错目的)?
如果您想为每个演员实例化不超过一个实例 - 您可能需要 SenatorPalpatine
来监督这三个实例。如果您可能有多个 StormTrooper
- 您可能希望 JangoFett
演员负责创建(并可能杀死)他们,一些 router is also good option (it will supervise them automatically). This will also give you an ability to restart all troopers if one fails (OneForAllStrategy
),广播能力,持有一些常用统计等
使用路由器的示例 (pseudo-Scala):
//application.conf
akka.actor.deployment {
/palpatine/vader {
router = broadcast-pool
nr-of-instances = 1
}
/palpatine/troopers {
router = broadcast-pool
nr-of-instances = 10
}
}
class Palpatine extends Actor {
import context._
val troopers = actorOf(FromConfig.props(Props[Trooper],
"troopers").withSupervisorStrategy(strategy) //`strategy` is strategy for troopers
val vader = actorOf(FromConfig.props(Props[Vader]), "vader")
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1) //stategy for Palpatine's children (routers itself)
val strategy = OneForOneStrategy(maxNrOfRetries = 100, withinTimeRange = 1) //stategy for troopers
def receive = {
case p@Process => troopers ! p; vader ! p
case t@Terminted => println(t)
}
}
根据标准 akka-config 创建广播池。我还展示了你可以为他们单独定制监督策略。
如果您希望某些 actor 出于某种原因忽略消息 - 只需在 actor 中实现此逻辑,例如:
class Vader extends Actor {
def receive {
case p@Process => ...
case Ignore => context.become(ignore) //changes message handler to `ignore`
}
def ignore = {
case x => println("Ignored message " + x)
case UnIgnore => context.become(process)//changes message handler back
}
}
这将动态配置 ignore/unignore(否则它只是一个简单的 if
)。您可以根据某些配置向演员发送 Ignore
消息:
val listOfIgnorantPathes = readFromSomeConfig()
context.actorSelection(listOfIgnoredPathes) ! Ignore
如果你想从配置中控制异构广播,你也可以像 trooper 的路由器一样为 palpatine 创建广播器(只需使用组而不是池):
akka.actor.deployment {
... //vader, troopers configuration
/palpatine/broadcaster {
router = broadcast-group
routees.paths = ["/palpatine/vader", "/palpatine/troopers"]
}
}
class Palpatine extends Actor {
... //vader, troopers definitions
val broadcaster = actorOf(FromConfig.props(), "broadcaster")
def receive = {
case p@Process => broadcaster ! p
}
}
将 vader 从 routees.paths
中排除,使他不会收到 Process
消息。
P.S。 Actor 永远不会孤单 - 总是有 Guardian Actor(参见 The Top-Level Supervisors),它会在出现异常时关闭整个系统。所以无论哪种方式 SenatorPalpatine
都可能真的成为你的救星。
P.S.2 context.actorSelection("palpatine/*")
实际上允许您向所有 children 发送消息(作为广播池和组的替代),因此您不需要里面有一套。
基于 Master
演员来复制和分发 Process
es。从概念上讲,您不会让用户(或任何正在生成您的输入的东西)为每个演员提供一次相同的输入。他们只会提供一次消息,然后您(或 Master
演员)会根据需要复制该消息并将其发送给每个适当的子演员。
正如