如何保证分叉的顺序性

How to guarantee sequentiality for forks in akka

我们正在为每个(小的)传入消息组创建一个参与者链,以保证它们的顺序处理和管道(组通过公共 ID 区分)。问题是我们的链有分叉,比如 A1 -> (A2 -> A3 | A4 -> A5),我们应该保证通过 A2 -> A3A4 -> A5 的消息之间没有竞争。当前的遗留解决方案是阻止 A1 actor 直到当前消息被完全处理(在其中一个子链中):

def receive { //pseudocode
    case x => ...
      val f = A2orA4 ? msg
      Await.complete(f, timeout)
}

因此,应用程序中的线程数与正在处理的消息数成正比,无论这些消息是活动的还是正在异步等待来自外部服务的某些响应。它与 fork-join(或任何其他动态)池一起工作大约两年,但当然不能与固定池一起工作,并且在高负载的情况下会极大地降低性能。不仅如此,它还会影响 GC,因为每个阻塞的 fork-actor 都会在内部保存冗余的先前消息的状态。

即使有背压,它也会创建比收到的消息多 N 倍的线程(因为流程中有 N 个顺序分叉),这仍然很糟糕,因为处理一条消息需要很长时间,但不会太多 CPU .所以我们应该处理尽可能多的消息,因为我们有足够的内存。我想出的第一个解决方案 - 将链线性化为 A1 -> A2 -> A3 -> A4 -> A5。还有更好的吗?

更简单的解决方案是将最后收到的消息的未来存储到参与者的状态中,并将其与之前的未来链接起来:

def receive = process(Future{new Ack}) //completed future
def process(prevAck: Future[Ack]): Receive = { //pseudocode
    case x => ...
        context become process(prevAck.flatMap(_ => A2orA4 ? msg))
}

因此它将创建没有任何阻塞的期货链。期货完成后链将被删除(最后一个除外)。