Akka actor 获取剩余消息列表

Akka actor get remaining message list

我有一个演员在计算一些密集的东西,理想情况下只有最后一个结果应该算在内。

我希望如果它收到多个相同类型的消息A(data),只处理最后一个而丢弃前面的。

我怎样才能做到这一点?

自定义邮箱

您可以尝试实现一些自定义邮箱,包含 0 或 1 条消息:

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config

class SingleMessageQueue extends MessageQueue {
  var message = Option.empty[Envelope]
  def enqueue(receiver: ActorRef, handle: Envelope) = message = Some(handle)
  def dequeue() = {
    val handle = message.orNull
    message = None
    handle
  }
  def numberOfMessages = message.size
  def hasMessages = message.nonEmpty
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = message.foreach(deadLetters.enqueue(owner, _))
}

final case class SingleMessageMailbox() extends MailboxType with ProducesMessageQueue[SingleMessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this()

  override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new SingleMessageQueue
}

然后按照 the mailbox section of the docs

中的描述为您的演员启用它

分裂演员

你可以介绍一对演员。

  • Manager,收到一份工作,当它现在不工作时重新发送给 Worker
  • Worker 进行实际工作并在完成后通知其经理

示例:

import akka.actor.{Actor, ActorRef, Props}

object Worker {
  case class Job()
  case object JobDone
}

import Worker.{Job, JobDone}

class Worker extends Actor {
  override def receive = {
    case Job() ⇒
      // your long job
      context.parent ! JobDone
  }
}

class Manager extends Actor {
  var nextJob = Option.empty[(Job, ActorRef)]
  val worker = context.actorOf(Props[Worker])

  def working: Receive = {
    case job: Job ⇒ nextJob = Some((job, sender))
    case JobDone ⇒
      nextJob match {
        case Some((job, snd)) ⇒ worker.tell(job, snd)
        case None ⇒ context.become(free)
      }
  }

  def free: Receive = {
    case job: Job ⇒
      worker.tell(job, sender)
      context.become(working)
  }

  override def receive = free
}