Akka 基于 actor 的自定义事件总线实现导致瓶颈

Akka's actor based custom Event Bus implementation causes bottleneck

我正在尝试在 Akka 的参与者模型之上实现事件总线(Pub-Sub)模式。

"Native" EventBus implementation doesn't meet some of my requirements (e.g. possibility of retaining only last message in a topic, it's specific for MQTT protocol, I'm implementing message broker for it https://github.com/butaji/JetMQ).

我的EventBus当前接口如下:

object Bus {
  case class Subscribe(topic: String, actor: ActorRef)
  case class Unsubscribe(topic: String, actor: ActorRef)
  case class Publish(topic: String, payload: Any, retain: Boolean = false)
}

用法如下:

val system = ActorSystem("System")
val bus = system.actorOf(Props[MqttEventBus], name = "bus")
val device1 = system.actorOf(Props(new DeviceActor(bus)))
val device2 = system.actorOf(Props(new DeviceActor(bus)))

所有 设备 都引用了单个总线 actor。 Bus actor 负责存储订阅和主题的所有状态(例如保留消息)。

Device actors 可以自行决定发布、订阅或取消订阅主题。

经过一些性能基准测试后,我意识到我当前的设计会影响发布和订阅之间的处理时间,原因如下:

  1. 我的EventBus其实是单例的
  2. 它造成了巨大的队列处理负载

如何为我的事件总线实现分配(并行化)工作负载? 目前的解决方案是否适合 akka-cluster?

目前,我正在考虑 routing 通过以下几个总线实例:

val paths = (1 to 5).map(x => {
  system.actorOf(Props[EventBusActor], name = "event-bus-" + x).path.toString
})

val bus_publisher = system.actorOf(RoundRobinGroup(paths).props())
val bus_manager = system.actorOf(BroadcastGroup(paths).props())

其中:

如下所示,它将跨所有总线复制状态,并根据负载分布减少每个参与者的队列。

您可以在单例总线内部而不是外部路由。您的总线可能负责路由消息和建立主题,而 sub-Actors 可能负责分发消息。一个演示我所描述内容但没有取消订阅功能、重复订阅检查或监督的基本示例:

import scala.collection.mutable
import akka.actor.{Actor, ActorRef}

class HashBus() extends Actor {
  val topicActors = mutable.Map.empty[String, ActorRef]

  def createDistributionActor = {
    context.actorOf(Props[DistributionActor])
  }

  override def receive = {
    case subscribe : Subscribe =>
      topicActors.getOrElseUpdate(subscribe.topic, createDistributionActor) ! subscribe

    case publish : Publish =>
      topicActors.get(topic).foreach(_ ! publish)
  }
}

class DistributionActor extends Actor {

  val recipients = mutable.List.empty[ActorRef]

  override def receive = {
    case Subscribe(topic: String, actorRef: ActorRef) =>
      recipients +: actorRef

    case publish : Publish =>
      recipients.map(_ ! publish)
  }
}

这将确保您的总线 Actor 的邮箱不会饱和,因为总线的工作只是进行哈希查找。 DistributionActor 将负责映射接收者并分发有效负载。同样,DistributionActor 可以保留主题的任何状态。