Akka 基于 actor 的自定义事件总线实现导致瓶颈
Akka's actor based custom Event Bus implementation causes bottleneck
我正在尝试在 Akka 的参与者模型之上实现事件总线(Pub-Sub)模式。
"Native" EventBus implementation doesn't meet some of my requirements (e.g. possibility of retaining only last message in a topic, it's specific for MQTT protocol, I'm implementing message broker for it https://github.com/butaji/JetMQ).
我的EventBus当前接口如下:
object Bus {
case class Subscribe(topic: String, actor: ActorRef)
case class Unsubscribe(topic: String, actor: ActorRef)
case class Publish(topic: String, payload: Any, retain: Boolean = false)
}
用法如下:
val system = ActorSystem("System")
val bus = system.actorOf(Props[MqttEventBus], name = "bus")
val device1 = system.actorOf(Props(new DeviceActor(bus)))
val device2 = system.actorOf(Props(new DeviceActor(bus)))
所有 设备 都引用了单个总线 actor。 Bus actor 负责存储订阅和主题的所有状态(例如保留消息)。
Device actors 可以自行决定发布、订阅或取消订阅主题。
经过一些性能基准测试后,我意识到我当前的设计会影响发布和订阅之间的处理时间,原因如下:
- 我的EventBus其实是单例的
- 它造成了巨大的队列处理负载
如何为我的事件总线实现分配(并行化)工作负载?
目前的解决方案是否适合 akka-cluster?
目前,我正在考虑 routing 通过以下几个总线实例:
val paths = (1 to 5).map(x => {
system.actorOf(Props[EventBusActor], name = "event-bus-" + x).path.toString
})
val bus_publisher = system.actorOf(RoundRobinGroup(paths).props())
val bus_manager = system.actorOf(BroadcastGroup(paths).props())
其中:
- bus_publisher会负责获取Publish,
- bus_manager 将负责获取订阅/取消订阅。
如下所示,它将跨所有总线复制状态,并根据负载分布减少每个参与者的队列。
您可以在单例总线内部而不是外部路由。您的总线可能负责路由消息和建立主题,而 sub-Actors 可能负责分发消息。一个演示我所描述内容但没有取消订阅功能、重复订阅检查或监督的基本示例:
import scala.collection.mutable
import akka.actor.{Actor, ActorRef}
class HashBus() extends Actor {
val topicActors = mutable.Map.empty[String, ActorRef]
def createDistributionActor = {
context.actorOf(Props[DistributionActor])
}
override def receive = {
case subscribe : Subscribe =>
topicActors.getOrElseUpdate(subscribe.topic, createDistributionActor) ! subscribe
case publish : Publish =>
topicActors.get(topic).foreach(_ ! publish)
}
}
class DistributionActor extends Actor {
val recipients = mutable.List.empty[ActorRef]
override def receive = {
case Subscribe(topic: String, actorRef: ActorRef) =>
recipients +: actorRef
case publish : Publish =>
recipients.map(_ ! publish)
}
}
这将确保您的总线 Actor 的邮箱不会饱和,因为总线的工作只是进行哈希查找。 DistributionActor 将负责映射接收者并分发有效负载。同样,DistributionActor 可以保留主题的任何状态。
我正在尝试在 Akka 的参与者模型之上实现事件总线(Pub-Sub)模式。
"Native" EventBus implementation doesn't meet some of my requirements (e.g. possibility of retaining only last message in a topic, it's specific for MQTT protocol, I'm implementing message broker for it https://github.com/butaji/JetMQ).
我的EventBus当前接口如下:
object Bus {
case class Subscribe(topic: String, actor: ActorRef)
case class Unsubscribe(topic: String, actor: ActorRef)
case class Publish(topic: String, payload: Any, retain: Boolean = false)
}
用法如下:
val system = ActorSystem("System")
val bus = system.actorOf(Props[MqttEventBus], name = "bus")
val device1 = system.actorOf(Props(new DeviceActor(bus)))
val device2 = system.actorOf(Props(new DeviceActor(bus)))
所有 设备 都引用了单个总线 actor。 Bus actor 负责存储订阅和主题的所有状态(例如保留消息)。
Device actors 可以自行决定发布、订阅或取消订阅主题。
经过一些性能基准测试后,我意识到我当前的设计会影响发布和订阅之间的处理时间,原因如下:
- 我的EventBus其实是单例的
- 它造成了巨大的队列处理负载
如何为我的事件总线实现分配(并行化)工作负载? 目前的解决方案是否适合 akka-cluster?
目前,我正在考虑 routing 通过以下几个总线实例:
val paths = (1 to 5).map(x => {
system.actorOf(Props[EventBusActor], name = "event-bus-" + x).path.toString
})
val bus_publisher = system.actorOf(RoundRobinGroup(paths).props())
val bus_manager = system.actorOf(BroadcastGroup(paths).props())
其中:
- bus_publisher会负责获取Publish,
- bus_manager 将负责获取订阅/取消订阅。
如下所示,它将跨所有总线复制状态,并根据负载分布减少每个参与者的队列。
您可以在单例总线内部而不是外部路由。您的总线可能负责路由消息和建立主题,而 sub-Actors 可能负责分发消息。一个演示我所描述内容但没有取消订阅功能、重复订阅检查或监督的基本示例:
import scala.collection.mutable
import akka.actor.{Actor, ActorRef}
class HashBus() extends Actor {
val topicActors = mutable.Map.empty[String, ActorRef]
def createDistributionActor = {
context.actorOf(Props[DistributionActor])
}
override def receive = {
case subscribe : Subscribe =>
topicActors.getOrElseUpdate(subscribe.topic, createDistributionActor) ! subscribe
case publish : Publish =>
topicActors.get(topic).foreach(_ ! publish)
}
}
class DistributionActor extends Actor {
val recipients = mutable.List.empty[ActorRef]
override def receive = {
case Subscribe(topic: String, actorRef: ActorRef) =>
recipients +: actorRef
case publish : Publish =>
recipients.map(_ ! publish)
}
}
这将确保您的总线 Actor 的邮箱不会饱和,因为总线的工作只是进行哈希查找。 DistributionActor 将负责映射接收者并分发有效负载。同样,DistributionActor 可以保留主题的任何状态。