限制只保留最新消息的 akka-actor
Throttling akka-actor that keeps only the newest messages
情况
我正在使用 akka actors 来更新我的网络客户端上的数据。其中一名参与者全权负责发送有关单个 Agent
的更新。这些代理的更新速度非常快(每 10 毫秒)。我现在的目标是限制此更新机制,以便每 300 毫秒发送每个 Agent
的最新版本。
我的代码
这是我到目前为止想出的:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}
问题
这个 actor (BroadcastSingleAgentActor
) 按预期工作,但我不能 100% 确定这是否是线程安全的(更新 queue
并可能清除它)。此外,这并不像是我在充分利用 akka 提供给我的工具。我发现了这个 article(Akka 2 中的限制消息),但我的问题是我需要保留最新的 Agent
消息,同时删除它的任何旧版本。有没有类似我需要的例子?
不,这不是线程安全的,因为通过 ActorSystem
的调度将发生在 receive
之外的另一个线程上。一个可能的想法是在 receive
方法中进行调度,因为传入 BroadcastSingleAgentActor
的消息将按顺序处理。
override def receive: Receive = {
case Refresh =>
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
}
queue = Set()
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
情况
我正在使用 akka actors 来更新我的网络客户端上的数据。其中一名参与者全权负责发送有关单个 Agent
的更新。这些代理的更新速度非常快(每 10 毫秒)。我现在的目标是限制此更新机制,以便每 300 毫秒发送每个 Agent
的最新版本。
我的代码
这是我到目前为止想出的:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}
问题
这个 actor (BroadcastSingleAgentActor
) 按预期工作,但我不能 100% 确定这是否是线程安全的(更新 queue
并可能清除它)。此外,这并不像是我在充分利用 akka 提供给我的工具。我发现了这个 article(Akka 2 中的限制消息),但我的问题是我需要保留最新的 Agent
消息,同时删除它的任何旧版本。有没有类似我需要的例子?
不,这不是线程安全的,因为通过 ActorSystem
的调度将发生在 receive
之外的另一个线程上。一个可能的想法是在 receive
方法中进行调度,因为传入 BroadcastSingleAgentActor
的消息将按顺序处理。
override def receive: Receive = {
case Refresh =>
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
}
queue = Set()
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}