Actor 可以在特定条件下读取消息吗?
Can actors read messages under a certain condition?
我遇到这种情况:
- ActorA 每 30-40 秒发送一次 ActorB start/stop 消息
- ActorA 发送 ActorB 字符串进行打印(总是)
- ActorB 必须打印他收到的字符串,但前提是 ActorA 只发送一条开始消息
现在我想知道我是否可以做以下事情:
- ActorB 能否仅在特定条件下(如果布尔值设置为 true)读取消息而不会丢失他在该布尔值设置为 false 时收到的消息?
- ActorB 能否在其他字符串消息之前读取来自 ActorA 的 start/stop 消息?我想要这种情况:ActorA 向 ActorB 发送开始消息,ActorB 开始打印他在开始消息之前收到的字符串(并且仍在接收),然后在收到停止消息后立即停止?
不知道我解释的好不好
编辑:谢谢,答案很好,但我仍有一些疑问。
是否会保持消息的顺序?我的意思是,如果我有 "Start-M1-M2-Stop-M3-M4-M5-Start-M6-M7-Stop",打印顺序是 "M1-M2" 然后是 "M3-M4-M5-M6-M7" 还是 M6 可以在 M3、M4 和 M5 之前读取(如果 M6 在成为之后收到) ?
我可以给 start/stop 消息更高的优先级吗?如果 ActorB 收到 "M1-M2-M3",然后它在打印时收到停止消息 "M1",我希望 ActorB 再次保存 M2 和 M3。
检查 Become/Unbecome 功能。它可以让你改变演员的行为。
如果我理解正确的话,您希望您的 ActorB 具有两种不同的状态。在第一个状态,它应该缓存它接收到的消息。在第二种状态下,它应该打印所有缓存的消息并开始打印所有新消息。
像这样:
case class Start
case class Stop
case class Message(val: String)
class ActorB extends Actor {
var cache = Queue()
def initial: Receive = {
case Message(msg) => cache += msg
case Start =>
for (m <- cache) self ! Message(m)
context.become(printing)
}
def printing: Receive = {
case Message(msg) => println(msg)
case Stop => context.become(initial) //or stop the actor
}
def receive = initial
}
让 Actor B 在两种状态(两种不同的行为)之间交替。在初始 'pending' 状态下,它等待 'start' 消息,而 stashing 任何其他消息。
收到 'start' 消息后,取消存储所有存储的消息和 become 'active',等待 'stop' 消息并写出收到的其他消息(这将包括未隐藏的)。收到 'stop' 消息后,恢复到 'pending' 行为。
我的一些想法
是的,如果布尔标志是从数据库或配置文件等系统资源中获取的,但我认为它不应该依赖于任何外部消息,因为参与者从多个其他演员。如果ActorB只被ActorA使用,两者可以合并为一个
同1,如何处理来自多个actor的消息?如果只有一个actorA,可以合并两个actor。如果有多个,可以在数据库中设置flag,actorA将db中的flag改为"Start"或"Stop"。 Actor B 将根据标志打印或不打印。
一个演员应该在其他演员的状态下非常独立地做某事。开始和停止实际上是ActorA的某种状态而不是ActorB
您完全可以使用 Akka 的 Stash
特性和 become/unbecome
功能来解决您的问题。思路如下:
当您收到 Stop
消息时,您会切换到一种行为,即存储所有不是 Start
的消息。当您收到一条 Start
消息时,您会切换到打印所有收到的消息的行为,此外您还取消存储同时到达的所有消息。
case object Start
case object Stop
case object TriggerStateChange
case object SendMessage
class ActorB extends Actor with Stash {
override def receive: Receive = {
case Start =>
context.become(printingBehavior, false)
unstashAll()
case x => stash()
}
def printingBehavior: Receive = {
case msg: String => println(msg)
case Stop => context.unbecome()
}
}
class ActorA(val actorB: ActorRef) extends Actor {
var counter = 0
var started = false
override def preStart: Unit = {
import context.dispatcher
this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange)
this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage)
}
override def receive: Actor.Receive = {
case SendMessage =>
actorB ! "Message: " + counter
counter += 1
case TriggerStateChange =>
actorB ! (if (started) {
started = false
Stop
} else {
started = true
Start
})
}
}
object Akka {
def main(args: Array[String]) = {
val system = ActorSystem.create("TestActorSystem")
val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB")
val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA")
system.awaitTermination()
}
}
你已经有了很多好的答案,但不知何故我觉得有必要提供更简短的答案,因为你需要的不一定是状态机:
class ActorB extends Actor {
def receive: Receive = caching(Nil)
def caching(cached: List[String]): Receive = {
case msg: String =>
context.become(caching(msg :: cached))
case Start =>
cached.reverse.foreach(println)
context.become(caching(Nil))
}
}
我遇到这种情况:
- ActorA 每 30-40 秒发送一次 ActorB start/stop 消息
- ActorA 发送 ActorB 字符串进行打印(总是)
- ActorB 必须打印他收到的字符串,但前提是 ActorA 只发送一条开始消息
现在我想知道我是否可以做以下事情:
- ActorB 能否仅在特定条件下(如果布尔值设置为 true)读取消息而不会丢失他在该布尔值设置为 false 时收到的消息?
- ActorB 能否在其他字符串消息之前读取来自 ActorA 的 start/stop 消息?我想要这种情况:ActorA 向 ActorB 发送开始消息,ActorB 开始打印他在开始消息之前收到的字符串(并且仍在接收),然后在收到停止消息后立即停止?
不知道我解释的好不好
编辑:谢谢,答案很好,但我仍有一些疑问。
是否会保持消息的顺序?我的意思是,如果我有 "Start-M1-M2-Stop-M3-M4-M5-Start-M6-M7-Stop",打印顺序是 "M1-M2" 然后是 "M3-M4-M5-M6-M7" 还是 M6 可以在 M3、M4 和 M5 之前读取(如果 M6 在成为之后收到) ?
我可以给 start/stop 消息更高的优先级吗?如果 ActorB 收到 "M1-M2-M3",然后它在打印时收到停止消息 "M1",我希望 ActorB 再次保存 M2 和 M3。
检查 Become/Unbecome 功能。它可以让你改变演员的行为。
如果我理解正确的话,您希望您的 ActorB 具有两种不同的状态。在第一个状态,它应该缓存它接收到的消息。在第二种状态下,它应该打印所有缓存的消息并开始打印所有新消息。
像这样:
case class Start
case class Stop
case class Message(val: String)
class ActorB extends Actor {
var cache = Queue()
def initial: Receive = {
case Message(msg) => cache += msg
case Start =>
for (m <- cache) self ! Message(m)
context.become(printing)
}
def printing: Receive = {
case Message(msg) => println(msg)
case Stop => context.become(initial) //or stop the actor
}
def receive = initial
}
让 Actor B 在两种状态(两种不同的行为)之间交替。在初始 'pending' 状态下,它等待 'start' 消息,而 stashing 任何其他消息。
收到 'start' 消息后,取消存储所有存储的消息和 become 'active',等待 'stop' 消息并写出收到的其他消息(这将包括未隐藏的)。收到 'stop' 消息后,恢复到 'pending' 行为。
我的一些想法
是的,如果布尔标志是从数据库或配置文件等系统资源中获取的,但我认为它不应该依赖于任何外部消息,因为参与者从多个其他演员。如果ActorB只被ActorA使用,两者可以合并为一个
同1,如何处理来自多个actor的消息?如果只有一个actorA,可以合并两个actor。如果有多个,可以在数据库中设置flag,actorA将db中的flag改为"Start"或"Stop"。 Actor B 将根据标志打印或不打印。
一个演员应该在其他演员的状态下非常独立地做某事。开始和停止实际上是ActorA的某种状态而不是ActorB
您完全可以使用 Akka 的 Stash
特性和 become/unbecome
功能来解决您的问题。思路如下:
当您收到 Stop
消息时,您会切换到一种行为,即存储所有不是 Start
的消息。当您收到一条 Start
消息时,您会切换到打印所有收到的消息的行为,此外您还取消存储同时到达的所有消息。
case object Start
case object Stop
case object TriggerStateChange
case object SendMessage
class ActorB extends Actor with Stash {
override def receive: Receive = {
case Start =>
context.become(printingBehavior, false)
unstashAll()
case x => stash()
}
def printingBehavior: Receive = {
case msg: String => println(msg)
case Stop => context.unbecome()
}
}
class ActorA(val actorB: ActorRef) extends Actor {
var counter = 0
var started = false
override def preStart: Unit = {
import context.dispatcher
this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange)
this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage)
}
override def receive: Actor.Receive = {
case SendMessage =>
actorB ! "Message: " + counter
counter += 1
case TriggerStateChange =>
actorB ! (if (started) {
started = false
Stop
} else {
started = true
Start
})
}
}
object Akka {
def main(args: Array[String]) = {
val system = ActorSystem.create("TestActorSystem")
val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB")
val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA")
system.awaitTermination()
}
}
你已经有了很多好的答案,但不知何故我觉得有必要提供更简短的答案,因为你需要的不一定是状态机:
class ActorB extends Actor {
def receive: Receive = caching(Nil)
def caching(cached: List[String]): Receive = {
case msg: String =>
context.become(caching(msg :: cached))
case Start =>
cached.reverse.foreach(println)
context.become(caching(Nil))
}
}