Akka actor 获取剩余消息列表
Akka actor get remaining message list
我有一个演员在计算一些密集的东西,理想情况下只有最后一个结果应该算在内。
我希望如果它收到多个相同类型的消息A(data)
,只处理最后一个而丢弃前面的。
我怎样才能做到这一点?
自定义邮箱
您可以尝试实现一些自定义邮箱,包含 0 或 1 条消息:
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config
class SingleMessageQueue extends MessageQueue {
var message = Option.empty[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope) = message = Some(handle)
def dequeue() = {
val handle = message.orNull
message = None
handle
}
def numberOfMessages = message.size
def hasMessages = message.nonEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = message.foreach(deadLetters.enqueue(owner, _))
}
final case class SingleMessageMailbox() extends MailboxType with ProducesMessageQueue[SingleMessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new SingleMessageQueue
}
然后按照 the mailbox section of the docs
中的描述为您的演员启用它
分裂演员
你可以介绍一对演员。
Manager
,收到一份工作,当它现在不工作时重新发送给 Worker
Worker
进行实际工作并在完成后通知其经理
示例:
import akka.actor.{Actor, ActorRef, Props}
object Worker {
case class Job()
case object JobDone
}
import Worker.{Job, JobDone}
class Worker extends Actor {
override def receive = {
case Job() ⇒
// your long job
context.parent ! JobDone
}
}
class Manager extends Actor {
var nextJob = Option.empty[(Job, ActorRef)]
val worker = context.actorOf(Props[Worker])
def working: Receive = {
case job: Job ⇒ nextJob = Some((job, sender))
case JobDone ⇒
nextJob match {
case Some((job, snd)) ⇒ worker.tell(job, snd)
case None ⇒ context.become(free)
}
}
def free: Receive = {
case job: Job ⇒
worker.tell(job, sender)
context.become(working)
}
override def receive = free
}
我有一个演员在计算一些密集的东西,理想情况下只有最后一个结果应该算在内。
我希望如果它收到多个相同类型的消息A(data)
,只处理最后一个而丢弃前面的。
我怎样才能做到这一点?
自定义邮箱
您可以尝试实现一些自定义邮箱,包含 0 或 1 条消息:
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config
class SingleMessageQueue extends MessageQueue {
var message = Option.empty[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope) = message = Some(handle)
def dequeue() = {
val handle = message.orNull
message = None
handle
}
def numberOfMessages = message.size
def hasMessages = message.nonEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = message.foreach(deadLetters.enqueue(owner, _))
}
final case class SingleMessageMailbox() extends MailboxType with ProducesMessageQueue[SingleMessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new SingleMessageQueue
}
然后按照 the mailbox section of the docs
中的描述为您的演员启用它分裂演员
你可以介绍一对演员。
Manager
,收到一份工作,当它现在不工作时重新发送给Worker
Worker
进行实际工作并在完成后通知其经理
示例:
import akka.actor.{Actor, ActorRef, Props}
object Worker {
case class Job()
case object JobDone
}
import Worker.{Job, JobDone}
class Worker extends Actor {
override def receive = {
case Job() ⇒
// your long job
context.parent ! JobDone
}
}
class Manager extends Actor {
var nextJob = Option.empty[(Job, ActorRef)]
val worker = context.actorOf(Props[Worker])
def working: Receive = {
case job: Job ⇒ nextJob = Some((job, sender))
case JobDone ⇒
nextJob match {
case Some((job, snd)) ⇒ worker.tell(job, snd)
case None ⇒ context.become(free)
}
}
def free: Receive = {
case job: Job ⇒
worker.tell(job, sender)
context.become(working)
}
override def receive = free
}