演员被随机触发
Actor getting triggered randomly
我正在使用 Scala 运行宁 AKKA 演员(版本 2.3.9)API。
我有一堆需要每 30 分钟触发一次的异构 Actor。我看到在单个 运行 中,并不是所有的演员都被触发了。这完全是随机的。可以这么说,每个演员都不做任何重量级的任务。他们对 NoSQL 存储进行大量读取和少量写入。不确定这里的实际问题是什么。我觉得有些地方我没有使用理想的方法。
这是代码:
val system = ActorSystem("pumpkinx-akka")
import system.dispatcher
val noOfActors = 50
val allActors = List(
system.actorOf(Props[a.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "aTriggerActor"),
system.actorOf(Props[b.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "bTriggerActor"),
system.actorOf(Props[c.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "cTriggerActor"),
system.actorOf(Props[d.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "dTriggerActor"),
system.actorOf(Props[e.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "eTriggerActor"))
def trigger = allActors.foreach(_ ! new Start)
system.scheduler.schedule(0 seconds, 30 minutes)(trigger)
system.awaitTermination()
您已经创建了 5 个路由器,每个路由器有 50 个参与者,所以它是 250 个 *.actors.TriggerActor
。如果你想在单个 运行 中向所有 250 个人发送一条消息,你应该:
def trigger = (1 to 50).foreach(_ => allActors.foreach(_ ! new Start))
它将向每个路由器发送 50 条消息。由于它是循环法,到达路由器的第一条消息将发送给它的第一个参与者,第二个 - 到第二个,依此类推,直到第 50 个参与者收到消息。
Just allActors.foreach(_ ! new Start)
只向 50 位演员中的一位发送消息 - 不是全部,没有广播。例如,a ! Start
只是将消息发送到 a.actors.TriggerActor
的实例之一
P.S。我的造型:
class Trigger extends Actor {
def receive = {
case x => println(context.parent.path.name + " " + self.path.name + " " + x)
}
}
defined class Trigger
val system = ActorSystem("pumpkinx-akka")
val allActors = List(
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "aTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "bTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "cTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "dTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "eTriggerActor"))
scala> allActors.foreach(_ ! "m") //everyone received a message
aTriggerActor $a m
bTriggerActor $a m
cTriggerActor $a m
dTriggerActor $a m
eTriggerActor $a m
scala> (0 to 5).foreach(_ => allActors(1) ! "m") //only to b-router
bTriggerActor $b m //bcdefg (round-robin)
bTriggerActor $g m
bTriggerActor $f m
bTriggerActor $d m
bTriggerActor $e m
bTriggerActor $c m
scala> (0 to 5).foreach(_ => allActors(1) ! "m") //only to b-router
bTriggerActor $h m //hijklm (round-robin)
bTriggerActor $l m
bTriggerActor $j m
bTriggerActor $k m
bTriggerActor $i m
bTriggerActor $m m
P.S。小心此类路由器内部的异常(更准确地说,内部工作人员)。如果路由器是顶级参与者 - 故障将传播到监护人,这会迫使整个系统关闭。
P.S。如果你还想要异构参与者的循环 - 使用 RoundRobinGroup
, see examples
我正在使用 Scala 运行宁 AKKA 演员(版本 2.3.9)API。
我有一堆需要每 30 分钟触发一次的异构 Actor。我看到在单个 运行 中,并不是所有的演员都被触发了。这完全是随机的。可以这么说,每个演员都不做任何重量级的任务。他们对 NoSQL 存储进行大量读取和少量写入。不确定这里的实际问题是什么。我觉得有些地方我没有使用理想的方法。
这是代码:
val system = ActorSystem("pumpkinx-akka")
import system.dispatcher
val noOfActors = 50
val allActors = List(
system.actorOf(Props[a.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "aTriggerActor"),
system.actorOf(Props[b.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "bTriggerActor"),
system.actorOf(Props[c.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "cTriggerActor"),
system.actorOf(Props[d.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "dTriggerActor"),
system.actorOf(Props[e.actors.TriggerActor].withRouter(new RoundRobinRouter(noOfActors)), "eTriggerActor"))
def trigger = allActors.foreach(_ ! new Start)
system.scheduler.schedule(0 seconds, 30 minutes)(trigger)
system.awaitTermination()
您已经创建了 5 个路由器,每个路由器有 50 个参与者,所以它是 250 个 *.actors.TriggerActor
。如果你想在单个 运行 中向所有 250 个人发送一条消息,你应该:
def trigger = (1 to 50).foreach(_ => allActors.foreach(_ ! new Start))
它将向每个路由器发送 50 条消息。由于它是循环法,到达路由器的第一条消息将发送给它的第一个参与者,第二个 - 到第二个,依此类推,直到第 50 个参与者收到消息。
Just allActors.foreach(_ ! new Start)
只向 50 位演员中的一位发送消息 - 不是全部,没有广播。例如,a ! Start
只是将消息发送到 a.actors.TriggerActor
P.S。我的造型:
class Trigger extends Actor {
def receive = {
case x => println(context.parent.path.name + " " + self.path.name + " " + x)
}
}
defined class Trigger
val system = ActorSystem("pumpkinx-akka")
val allActors = List(
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "aTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "bTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "cTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "dTriggerActor"),
system.actorOf(Props[Trigger].withRouter(new RoundRobinRouter(noOfActors)), "eTriggerActor"))
scala> allActors.foreach(_ ! "m") //everyone received a message
aTriggerActor $a m
bTriggerActor $a m
cTriggerActor $a m
dTriggerActor $a m
eTriggerActor $a m
scala> (0 to 5).foreach(_ => allActors(1) ! "m") //only to b-router
bTriggerActor $b m //bcdefg (round-robin)
bTriggerActor $g m
bTriggerActor $f m
bTriggerActor $d m
bTriggerActor $e m
bTriggerActor $c m
scala> (0 to 5).foreach(_ => allActors(1) ! "m") //only to b-router
bTriggerActor $h m //hijklm (round-robin)
bTriggerActor $l m
bTriggerActor $j m
bTriggerActor $k m
bTriggerActor $i m
bTriggerActor $m m
P.S。小心此类路由器内部的异常(更准确地说,内部工作人员)。如果路由器是顶级参与者 - 故障将传播到监护人,这会迫使整个系统关闭。
P.S。如果你还想要异构参与者的循环 - 使用 RoundRobinGroup
, see examples