akka actors 之间的负载均衡

Workload balancing between akka actors

我有 2 个 akka actor 用于抓取链接,即找到页面 X 中的所有链接,然后找到从 X 链接的所有页面中的所有链接,等等...

我希望他们或多或少以相同的速度进步,但往往其中一个饿死而另一个消耗所有资源。

我尝试了以下方法(已简化)。 单页抓取由以下参与者完成:

class Crawler extends Actor {
  def receive = {
    case Crawl(url, kind) =>
      // download url
      // extract links
      sender ! Parsed(url, links, kind)
  }
}

方法一:

class Coordinator extends Actor {
  val linksA = ...
  val linksB = ...
  def receive = {
    case Parsed(url, links, kind) =>
      val store = if (kind == kindA) linksA else linksB
      val newLinks = links -- store
      store ++= links
      newLinks.foreach { link =>
        val crawler = context.actorOf(Props[Crawler])
        crawler ! Crawl(link, kind)
      }
  }
}

方法二:

class Coordinator extends Actor {
  val linksA = ...
  val linksB = ...
  val rrProps = Props[Crawler].withRouter(RoundRobinRouter(nrOfInstances = 10)
  val crawlerA = context.actorOf(rrProps)
  val crawlerB = context.actorOf(rrProps)
  def receive = {
    case Parsed(url, links, kind) =>
      val store = if (kind == kindA) linksA else linksB
      val newLinks = links -- store
      store ++= links
      newLinks.foreach { link =>
        if (kind == kindA) crawlerA ! Crawl(link, kind)
        else crawlerB ! Crawl(link, kind)
      }
  }
}

第二种方法稍微好一点,但没有完全解决。

有什么好的方法可以让两种爬虫的进度大致相同?我是否应该在它们之间依次发送消息来解除阻塞?

我正在开发一个类似的程序,其中工作人员的资源成本不统一(在我的例子中,任务是执行数据库查询并将结果转储到另一个数据库中,但正如爬行不同的网站会有不同的成本,不同的查询也会有不同的成本)。我采用的两种处理方法:

  1. RoundRobinRouter 替换为 SmallestMailboxRouter
  2. 不要让 Coordinator 一次发送所有消息 - 而是分批发送它们,在您的情况下,您有 10 个工作人员,因此发送 40 条消息应该让他们一开始很忙。每当工作人员完成一项任务时,它都会向 Coordinator 发送一条消息,此时 Coordinator 会发出另一条消息,该消息可能会发送给刚刚完成其任务的工作人员。 (您也可以分批执行此操作,即在收到 n "task complete" 消息后 Coordinator 发送另一条 n 消息,但不要同时发送 n高,否则一些任务极短的工人可能会闲置。)

第三种选择是欺骗并在所有参与者之间共享一个 ConcurrentLinkedQueue:在填充队列后,Coordinator 向工作人员发送一条 "start" 消息,然后工作人员进行轮询队列直到它为空。

我也会考虑使用 Work Pulling 模式。此处对其进行了很好的描述,例如:"Akka Work Pulling Pattern to prevent mailbox overflow, throttle and distribute work"。 Zim-Zam 的答案中的方式 #2 基本上是相同的方法。

因此,当新解析的 link 可用时,您可以只宣布这些内容可用,而不是立即 "pushing" 将所有任务发送给您的爬虫,然后它们将 "pull"在他们准备好时进行这项工作。

此外,在协调器端,如果需要,您可以添加一些更复杂的逻辑来优先处理某些 link 类型的工作(例如,使用 PriorityQueue 处理未决的 links)。

使用 SmallestMailboxRouter 确实 稍微 会更好,但本质上它仍然基于 "pushing" 方法。