Akka 调度模式

Akka scheduling patterns

考虑经典 "Word Count" 程序。它计算某个目录中所有文件中的单词数。 Master 收到一些目录并在 Worker 参与者之间拆分工作(每个 worker 使用一个文件)。这是伪代码:

class WordCountWorker extends Actor {

  def receive = {
    case FileToCount(fileName:String) =>
      val count = countWords(fileName)
      sender ! WordCount(fileName, count)
  }
}

class WordCountMaster extends Actor {
  def receive = {
    case StartCounting(docRoot) => // sending each file to worker
      val workers = createWorkers()
      fileNames = scanFiles(docRoot)
      sendToWorkers(fileNames, workers)
    case WordCount(fileName, count) => // aggregating results
      ...

  }
}

但我想 运行 这个字数统计程序按计划(例如每 1 分钟一次)提供不同的目录进行扫描。

并且 Akka 提供了很好的调度消息传递的方式:

system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))

但是当调度器通过tick发送新消息时,上述调度器的问题就开始了,但是之前的消息还没有被处理(例如我发送消息扫描一些大目录,1秒后我发送另一条消息扫描另一个目录,因此第一个目录的处理操作尚未完成)。因此,我的 WordCountMaster 将收到来自正在处理不同目录的工作人员的 WordCount 消息。

作为一种解决方法,而不是安排消息发送,我可以安排一些代码块的执行,它会在每次 new WordCountMaster 时创建。 IE。一个目录 = 一个 WordCountMaster。但我认为它效率低下,而且我还需要注意为 WordCountMaster 提供唯一名称以避免 InvalidActorNameException

所以我的问题是:我是否应该像上一段中提到的那样为每个报价单创建新的 WordCountMaster?或者有更好的ideas/patterns如何重新设计这个程序来支持调度?


一些更新: 如果为每个目录创建一个 Master actor,我会遇到一些问题:

  1. 命名演员的问题

InvalidActorNameException: actor name [WordCountMaster] is not unique!

InvalidActorNameException: actor name [WordCountWorker ] is not unique!

我可以不提供演员姓名来解决这个问题。但在这种情况下,我的演员会收到自动生成的名称,例如 $a$b 等。这对我不利。

  1. 配置问题:

我想将我的路由器配置排除到 application.conf。 IE。我想为每个 WordCountWorker 路由器提供相同的配置。但是因为我没有控制演员的名字,所以我不能使用下面的配置,因为我不知道演员的名字:

  /wordCountWorker{
    router = smallest-mailbox-pool
    nr-of-instances = 5
    dispatcher = word-counter-dispatcher
  }

我不是 Akka 专家,但我认为每个聚合都有一个 actor 的方法并不低效。您需要以某种方式保持并发聚合分离。您可以给每个聚合一个 id,以便在唯一的主 actor 中用 id 分隔它们,或者您可以使用 Akka actor 命名和生命周期逻辑,并将每个计数轮的每个聚合委托给一个将存活的 actor只是为了聚合逻辑。

对我来说,每次聚合使用一个 actor 似乎更优雅。

另请注意,Akka 具有聚合模式的实现,如所述here

您应该在 worker 中雇用 become/unbecome 功能。如果您的工作人员开始扫描大文件夹,请使用 become 更改忽略另一条消息(或不处理它的响应)的参与者行为,在目录扫描后将消息发送回字数和 unbecome标准行为。

首先。命名问题:只需动态且唯一地命名您的演员,如下所示:
WorkerActor + "-" + 文件名...或... MasterActor + "-" + 目录名
还是我遗漏了什么?

其次,为什么要调度?当第一个目录完成后开始处理下一个目录不是更合乎逻辑吗?如果需要安排时间,那么我会看到许多不同的解决方案来解决您的问题,我会尝试解决其中的一些问题:

1.
三级层次结构:
MasterActor -> DirectoryActor -> WorkerActor
为每个新目录创建一个新的目录 actor,为每个文件创建一个新的 worker。

2.
二级层次结构:
MasterActor -> WorkerActor
您为每个文件创建一个新工作器。
识别接收结果的两个选项:
a) 通过 futures
询问和汇总结果,将工作分配给工人 b) 在作业中包含消息 ID(例如目录名称)

3.
具有负载平衡的二级层次结构:
与选项 2 相同,但您不会为每个文件创建一个新的工作人员,您有固定数量的工作人员,或者使用平衡调度程序或最小的邮箱路由器。

4.
具有期货的一级层次结构:
大师级演员没有children,他确实工作并且只用期货汇总结果。

我还建议阅读 Gregor Raýman 在他的回答中建议的 Akka 聚合模式。

就个人而言,我根本不会使用 actor 来解决这个聚合问题,但无论如何,就这样吧。

我认为没有一种合理的方法可以按照您建议的方式同时处理多个目录的字数统计。相反,您应该有一个 "master-master" actor 来监督计数器。因此,您有三个演员 类:

  • FileCounter:它接收要读取的文件并处理它。完成后,它将结果发送回发件人。
  • CounterSupervisor:这个跟踪哪个 FileCounter 已经完成了他们的工作,并将结果发送回 WordCountForker。
  • WordCountForker:此参与者将跟踪哪个子系统完成了他们的任务,如果他们都很忙,则创建一个新的 CounterSupervisor 来解决问题。

文件计数器必须是最容易编写的。

class FileCounter() extends Actor with ActorLogging {

    import context.dispatcher

    override def preStart = {
        log.info("FileCounter Actor initialized")
    }

    def receive = {
        case CountFile(file) =>
            log.info("Counting file: " + file.getAbsolutePath)

            FileIO.readFile(file).foreach { data =>
                val words = data
                    .split("\n")
                    .map { _.split(" ").length }
                    .sum

                context.parent ! FileCount(words)
            }
    }
}

现在监督文件计数器的演员。

class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging {

    var total = 0
    var files: Array[File] = _
    var pendingActors = 0

    override def preStart = {
        for(i <- 1 to actorPool)
            context.actorOf(FileCounter.props(), name = s"counter$i")
    }

    def receive = {
        case CountDirectory(base) =>
            log.info("Now counting starting from directory : " + base.getAbsolutePath)
            total = 0
            files = FileIO.getAllFiles(base)
            pendingActors = 0
            for(i <- 1 to actorPool if(i < files.length)) {
                pendingActors += 1
                context.child(s"counter$i").get ! CountFile(files.head)
                files = files.tail
            }

        case FileCount(count) =>
            total += count
            pendingActors -= 1
            if(files.length > 0) {
                sender() ! CountFile(files.head)
                files = files.tail
                pendingActors += 1
            } else if(pendingActors == 0) {
                context.parent ! WordCountTotal(total)
            }
    }
}

然后是监督监督的演员

class WordCountForker(counterActors: Int) extends Actor with ActorLogging {

    var busyActors: List[(ActorRef, ActorRef)] = Nil
    var idleActors: List[ActorRef] = _

    override def preStart = {
        val first = context.actorOf(CounterSupervisor.props(counterActors))
        idleActors = List(first)
        log.info(s"Initialized first supervisor with $counterActors file counters.")
    }

    def receive = {
        case msg @ CountDirectory(dir) =>
            log.info("Count directory received")
            val counter = idleActors match {
                case Nil =>
                    context.actorOf(CounterSupervisor.props(counterActors))
                case head :: rest =>
                    idleActors = rest
                    head
            }
            counter ! msg
            busyActors = (counter, sender()) :: busyActors

        case msg @ WordCountTotal(n) =>
            val path = sender().path.toString()
            val index = busyActors.indexWhere { _._1.path.toString == path }
            val (counter, replyTo) = busyActors(index)
            replyTo ! msg
            idleActors = counter :: idleActors
            busyActors = busyActors.patch(index, Nil, 1)
    }
}

我在答案中省略了一些部分以尽可能保持简洁,如果您想查看其余代码I posted a Gist

此外,考虑到您对效率的担忧,这里的解决方案将防止每个目录有一个子系统,但如果有需要,您仍然会产生多个子系统。