如何确定 Scala 中参与者之间发送的消息的优先级?
How to prioritize messages sent between actors in Scala?
我正在尝试从子 actor 捕获 'Terminate Signal',但是在死信消息池中,信号无法到达父 actor 的队列。
解决此问题的最佳方法是什么?
这是我正在处理的代码片段:
class MinerActor extends Actor {
var count:Int = 0
def receive = {
case Mine =>
//some task here
//if success
count = count + 1
//
if (count >= 100)
{
context.stop(self)
}
}
class MasterActor extends Actor {
val miner = context.actorOf(Props(new MinerActor,name = "miner")
context.watch(miner)
def receive = {
case Foo =>
while (true) {
miner ! Mine
}
case Terminated(miner) =>
println("Miner Terminated!!")
context.stop(self)
context.system.shutdown
}
}
这里 'Terminated(miner)' 案例永远不会被调用。相反,在 stdout 上,我看到了很多从 Master 发送到 Miner 的死信消息(这在 miner actor 被停止时是意料之中的)。然而,如何在 Event 总线上确定终止信号的优先级以便到达 Master Actor?
如果我将 while 循环限制为大约 200 个而不是无穷大,在 100 个死信消息之后,我会收到打印 "Miner Terminated!!" 的终止信号。但是当while循环无穷大时如何实现呢?
我是 Scala/Akka 编程新手,我的主要目标是 运行 '//some task' 成功 100 次,然后退出整个程序。这是完成该任务的好方法吗?
问题是你是无限的 while 循环阻塞了 actor 线程。因此,您的 master actor 总是停留在处理第一个到达的 Foo
消息,而永远不会处理邮箱中的任何其他消息。这样做的原因是只有一个线程负责接收消息。这有一些非常好的含义,因为如果您的状态更改仅发生在该线程内,您基本上不必担心单个参与者内的并发性。
有多种方法可以解决这个问题。我建议使用调度程序来安排重复任务。
class MasterActor extends Actor {
var minerOption: Option[ActorRef] = None
var mineMessageOption: Option[Cancellable] = None
override def preStart: Unit = {
minerOption = Some(context.actorOf(Props(new MinerActor,name = "miner")))
minerOption.foreach(context.watch(_))
import context.dispatcher
mineMessageOption = Some(context.system.scheduler.schedule(0 seconds, 1 seconds, self, Foo))
}
def receive = {
case Foo =>
minerOption.foreach {
_ ! Mine
}
case Terminated(miner) =>
println("Miner Terminated!!")
mineMessageOption.foreach(_.cancel())
context.stop(self)
context.system.shutdown
}
}
在 schedule
调用中,您可以定义消息的间隔 Foo
,因此,将向矿工发送多少条消息。
替换:
case Foo =>
while (true) {
miner ! Mine
}
和
case Foo =>
miner ! Mine
self forward Foo
我正在尝试从子 actor 捕获 'Terminate Signal',但是在死信消息池中,信号无法到达父 actor 的队列。 解决此问题的最佳方法是什么?
这是我正在处理的代码片段:
class MinerActor extends Actor {
var count:Int = 0
def receive = {
case Mine =>
//some task here
//if success
count = count + 1
//
if (count >= 100)
{
context.stop(self)
}
}
class MasterActor extends Actor {
val miner = context.actorOf(Props(new MinerActor,name = "miner")
context.watch(miner)
def receive = {
case Foo =>
while (true) {
miner ! Mine
}
case Terminated(miner) =>
println("Miner Terminated!!")
context.stop(self)
context.system.shutdown
}
}
这里 'Terminated(miner)' 案例永远不会被调用。相反,在 stdout 上,我看到了很多从 Master 发送到 Miner 的死信消息(这在 miner actor 被停止时是意料之中的)。然而,如何在 Event 总线上确定终止信号的优先级以便到达 Master Actor?
如果我将 while 循环限制为大约 200 个而不是无穷大,在 100 个死信消息之后,我会收到打印 "Miner Terminated!!" 的终止信号。但是当while循环无穷大时如何实现呢?
我是 Scala/Akka 编程新手,我的主要目标是 运行 '//some task' 成功 100 次,然后退出整个程序。这是完成该任务的好方法吗?
问题是你是无限的 while 循环阻塞了 actor 线程。因此,您的 master actor 总是停留在处理第一个到达的 Foo
消息,而永远不会处理邮箱中的任何其他消息。这样做的原因是只有一个线程负责接收消息。这有一些非常好的含义,因为如果您的状态更改仅发生在该线程内,您基本上不必担心单个参与者内的并发性。
有多种方法可以解决这个问题。我建议使用调度程序来安排重复任务。
class MasterActor extends Actor {
var minerOption: Option[ActorRef] = None
var mineMessageOption: Option[Cancellable] = None
override def preStart: Unit = {
minerOption = Some(context.actorOf(Props(new MinerActor,name = "miner")))
minerOption.foreach(context.watch(_))
import context.dispatcher
mineMessageOption = Some(context.system.scheduler.schedule(0 seconds, 1 seconds, self, Foo))
}
def receive = {
case Foo =>
minerOption.foreach {
_ ! Mine
}
case Terminated(miner) =>
println("Miner Terminated!!")
mineMessageOption.foreach(_.cancel())
context.stop(self)
context.system.shutdown
}
}
在 schedule
调用中,您可以定义消息的间隔 Foo
,因此,将向矿工发送多少条消息。
替换:
case Foo =>
while (true) {
miner ! Mine
}
和
case Foo =>
miner ! Mine
self forward Foo