akka actors 之间的负载均衡
Workload balancing between akka actors
我有 2 个 akka actor 用于抓取链接,即找到页面 X 中的所有链接,然后找到从 X 链接的所有页面中的所有链接,等等...
我希望他们或多或少以相同的速度进步,但往往其中一个饿死而另一个消耗所有资源。
我尝试了以下方法(已简化)。
单页抓取由以下参与者完成:
class Crawler extends Actor {
def receive = {
case Crawl(url, kind) =>
// download url
// extract links
sender ! Parsed(url, links, kind)
}
}
方法一:
class Coordinator extends Actor {
val linksA = ...
val linksB = ...
def receive = {
case Parsed(url, links, kind) =>
val store = if (kind == kindA) linksA else linksB
val newLinks = links -- store
store ++= links
newLinks.foreach { link =>
val crawler = context.actorOf(Props[Crawler])
crawler ! Crawl(link, kind)
}
}
}
方法二:
class Coordinator extends Actor {
val linksA = ...
val linksB = ...
val rrProps = Props[Crawler].withRouter(RoundRobinRouter(nrOfInstances = 10)
val crawlerA = context.actorOf(rrProps)
val crawlerB = context.actorOf(rrProps)
def receive = {
case Parsed(url, links, kind) =>
val store = if (kind == kindA) linksA else linksB
val newLinks = links -- store
store ++= links
newLinks.foreach { link =>
if (kind == kindA) crawlerA ! Crawl(link, kind)
else crawlerB ! Crawl(link, kind)
}
}
}
第二种方法稍微好一点,但没有完全解决。
有什么好的方法可以让两种爬虫的进度大致相同?我是否应该在它们之间依次发送消息来解除阻塞?
我正在开发一个类似的程序,其中工作人员的资源成本不统一(在我的例子中,任务是执行数据库查询并将结果转储到另一个数据库中,但正如爬行不同的网站会有不同的成本,不同的查询也会有不同的成本)。我采用的两种处理方法:
- 将
RoundRobinRouter
替换为 SmallestMailboxRouter
- 不要让
Coordinator
一次发送所有消息 - 而是分批发送它们,在您的情况下,您有 10 个工作人员,因此发送 40 条消息应该让他们一开始很忙。每当工作人员完成一项任务时,它都会向 Coordinator
发送一条消息,此时 Coordinator
会发出另一条消息,该消息可能会发送给刚刚完成其任务的工作人员。 (您也可以分批执行此操作,即在收到 n
"task complete" 消息后 Coordinator
发送另一条 n
消息,但不要同时发送 n
高,否则一些任务极短的工人可能会闲置。)
第三种选择是欺骗并在所有参与者之间共享一个 ConcurrentLinkedQueue
:在填充队列后,Coordinator
向工作人员发送一条 "start" 消息,然后工作人员进行轮询队列直到它为空。
我也会考虑使用 Work Pulling 模式。此处对其进行了很好的描述,例如:"Akka Work Pulling Pattern to prevent mailbox overflow, throttle and distribute work"。
Zim-Zam 的答案中的方式 #2 基本上是相同的方法。
因此,当新解析的 link 可用时,您可以只宣布这些内容可用,而不是立即 "pushing" 将所有任务发送给您的爬虫,然后它们将 "pull"在他们准备好时进行这项工作。
此外,在协调器端,如果需要,您可以添加一些更复杂的逻辑来优先处理某些 link 类型的工作(例如,使用 PriorityQueue
处理未决的 links)。
使用 SmallestMailboxRouter
确实 稍微 会更好,但本质上它仍然基于 "pushing" 方法。
我有 2 个 akka actor 用于抓取链接,即找到页面 X 中的所有链接,然后找到从 X 链接的所有页面中的所有链接,等等...
我希望他们或多或少以相同的速度进步,但往往其中一个饿死而另一个消耗所有资源。
我尝试了以下方法(已简化)。 单页抓取由以下参与者完成:
class Crawler extends Actor {
def receive = {
case Crawl(url, kind) =>
// download url
// extract links
sender ! Parsed(url, links, kind)
}
}
方法一:
class Coordinator extends Actor {
val linksA = ...
val linksB = ...
def receive = {
case Parsed(url, links, kind) =>
val store = if (kind == kindA) linksA else linksB
val newLinks = links -- store
store ++= links
newLinks.foreach { link =>
val crawler = context.actorOf(Props[Crawler])
crawler ! Crawl(link, kind)
}
}
}
方法二:
class Coordinator extends Actor {
val linksA = ...
val linksB = ...
val rrProps = Props[Crawler].withRouter(RoundRobinRouter(nrOfInstances = 10)
val crawlerA = context.actorOf(rrProps)
val crawlerB = context.actorOf(rrProps)
def receive = {
case Parsed(url, links, kind) =>
val store = if (kind == kindA) linksA else linksB
val newLinks = links -- store
store ++= links
newLinks.foreach { link =>
if (kind == kindA) crawlerA ! Crawl(link, kind)
else crawlerB ! Crawl(link, kind)
}
}
}
第二种方法稍微好一点,但没有完全解决。
有什么好的方法可以让两种爬虫的进度大致相同?我是否应该在它们之间依次发送消息来解除阻塞?
我正在开发一个类似的程序,其中工作人员的资源成本不统一(在我的例子中,任务是执行数据库查询并将结果转储到另一个数据库中,但正如爬行不同的网站会有不同的成本,不同的查询也会有不同的成本)。我采用的两种处理方法:
- 将
RoundRobinRouter
替换为SmallestMailboxRouter
- 不要让
Coordinator
一次发送所有消息 - 而是分批发送它们,在您的情况下,您有 10 个工作人员,因此发送 40 条消息应该让他们一开始很忙。每当工作人员完成一项任务时,它都会向Coordinator
发送一条消息,此时Coordinator
会发出另一条消息,该消息可能会发送给刚刚完成其任务的工作人员。 (您也可以分批执行此操作,即在收到n
"task complete" 消息后Coordinator
发送另一条n
消息,但不要同时发送n
高,否则一些任务极短的工人可能会闲置。)
第三种选择是欺骗并在所有参与者之间共享一个 ConcurrentLinkedQueue
:在填充队列后,Coordinator
向工作人员发送一条 "start" 消息,然后工作人员进行轮询队列直到它为空。
我也会考虑使用 Work Pulling 模式。此处对其进行了很好的描述,例如:"Akka Work Pulling Pattern to prevent mailbox overflow, throttle and distribute work"。 Zim-Zam 的答案中的方式 #2 基本上是相同的方法。
因此,当新解析的 link 可用时,您可以只宣布这些内容可用,而不是立即 "pushing" 将所有任务发送给您的爬虫,然后它们将 "pull"在他们准备好时进行这项工作。
此外,在协调器端,如果需要,您可以添加一些更复杂的逻辑来优先处理某些 link 类型的工作(例如,使用 PriorityQueue
处理未决的 links)。
使用 SmallestMailboxRouter
确实 稍微 会更好,但本质上它仍然基于 "pushing" 方法。