Akka 调度程序:生产中的奇怪行为(消息未触发)
Akka scheduler : strange behavior in production (messages not firing)
我正在开发一个 scala + akka 应用程序作为更大应用程序的一部分。该应用程序的目的是调用外部服务和 SQL 数据库(使用 JDBC),进行一些处理,并定期 return 解析结果。该应用程序使用 akka 集群,因此它可以水平扩展。
它应该如何工作
我正在集群上创建一个**单例参与者*,负责向 指令处理程序 参与者池发送指令。我正在从 Redis pub/sub 通道接收事件,该通道说明应刷新哪些 数据源 以及刷新频率。这个 SourceScheduler actor 将指令连同间隔存储在内部数组中。
然后我使用 akka Scheduler 每秒执行一个 tick 函数。此函数过滤数组以确定需要执行哪些指令,并将消息发送到指令处理程序池。池中的路由执行指令并通过 Redis Pub/Sub
发出结果
问题
在我的机器上(Ryzen 7 + 16GB RAM + ArchLinux)一切正常 运行,我们正在轻松处理 2500 个数据库 calls/second。但是一旦投入生产,我无法让它处理超过 ~400 requests/s。
SourceScheduler 不会每秒滴答,消息会卡在邮箱中。此外,该应用程序使用更多 CPU 资源和更多内存(生产环境中为 1.3GB,而我的机器上为 ~350MB)
生产应用程序 运行s 在 Rancher 上基于 JRE-8 alpine 的 Docker 容器中,在 MS Azure 服务器上。
我知道集群上的单例 actor 可能 是一个瓶颈,但由于它只将消息转发给其他 actor,我不明白它如何阻止。
我试过的
- 我使用 Tomcat JDBC 作为 SQL 查询的连接池管理器。我确定我不会泄漏任何连接,因为我记录了从池中借用的每个连接以及 return 到它的每个连接
- 像 JDBC 查询这样的阻塞操作都是在一个单独的调度器上执行的,一个具有 500 个线程的固定线程池执行器,所以所有其他参与者应该 运行 正确地
- 我还为 SourceScheduler actor 提供了一个专用的固定调度程序,因此它应该 运行 在它自己的线程上
- 我已经尝试 运行将应用程序集成到具有 3 个节点的集群中,但没有提高性能。由于 SourceScheduler 是单例,运行宁多个节点不能解决问题
- 我已经在我同事的机器上试用了该应用程序。奇迹般有效。我只是遇到了生产服务器的问题
- 我已经尝试将生产服务器升级到 Azure 上可用的最强大的服务器(16 核,2.3ghz),没有明显的变化
有没有人经历过本地机器和生产服务器之间的这种差异?
编辑 SourceScheduler.scala
class SourceScheduler extends Actor with ActorLogging with Timers {
case object Tick
case object SchedulerReport
import context.dispatcher
val instructionHandlerPool = context.actorOf(
ClusterRouterGroup(
RoundRobinGroup(Nil),
ClusterRouterGroupSettings(
totalInstances = 10,
routeesPaths = List("/user/instructionHandler"),
allowLocalRoutees = true
)
).props(),
name = "instructionHandlerRouter")
var ticks: Int = 0
var refreshedSources: Int = 0
val maxTicks: Int = Int.MaxValue - 1
var scheduledSources = Array[(String, Int, String)]()
override def preStart(): Unit = {
log.info("Starting Scheduler")
}
def refreshSource(hash: String) = {
instructionHandlerPool ! Instruction(hash)
refreshedSources += 1
}
// Get sources that neeed to be refreshed
def getEligibleSources(sources: Seq[(String, Int, String)], tick: Int) = {
sources.groupBy(_._1).mapValues(_.toList.minBy(_._2)).values.filter(tick * 1000 % _._2 == 0).map(_._1)
}
def tick(): Unit = {
ticks += 1
log.debug("Scheduler TICK {}", ticks)
val eligibleSources = getEligibleSources(scheduledSources, ticks)
val chunks = eligibleSources.grouped(ConnectionPoolManager.connectionPoolSize).zipWithIndex.toList
log.debug("Scheduling {} sources in {} chunks", eligibleSources.size, chunks.size)
chunks.foreach({
case(sources, index) =>
after((index * 25 + 5) milliseconds, context.system.scheduler)(Future.successful {
sources.foreach(refreshSource)
})
})
if(ticks >= maxTicks) ticks = 0
}
timers.startPeriodicTimer("schedulerTickTimer", Tick, 990 milliseconds)
timers.startPeriodicTimer("schedulerReportTimer", SchedulerReport, 10 seconds)
def receive: Receive = {
case AttachSource(hash, interval, socketId) =>
scheduledSources.synchronized {
scheduledSources = scheduledSources :+ ((hash, interval, socketId))
}
case DetachSource(socketId) =>
scheduledSources.synchronized {
scheduledSources = scheduledSources.filterNot(_._3 == socketId)
}
case SchedulerReport =>
log.info("{} sources were scheduled since last report", refreshedSources)
refreshedSources = 0
case Tick => tick()
case _ =>
}
}
每个源都由包含执行所需的所有数据(例如数据库的主机)、刷新间隔和请求它的客户端的唯一 ID 的哈希确定,以便我们可以停止客户端断开连接时刷新。
每一秒,我们都会通过对 ticks 计数器的当前值应用模数来检查源是否需要刷新。
我们以较小的块刷新源以避免连接池饥饿
问题是在小负载下 (~300 rq/s) tick 函数不再每秒执行一次
也许瓶颈在网络延迟上?在您的机器中,所有组件 运行 并排,通信应该没有延迟,但在集群中,如果您从一台机器到另一台机器进行大量数据库调用,网络延迟可能会很明显。
事实证明问题出在 Rancher 身上。
我们进行了多次测试,该应用程序在机器上 运行 直接运行良好,在 docker 上运行良好,但在使用 Rancher 作为编排器时则不然。我不确定为什么,但因为它与 Akka 无关,所以我关闭了这个问题。
谢谢大家的帮助。
我正在开发一个 scala + akka 应用程序作为更大应用程序的一部分。该应用程序的目的是调用外部服务和 SQL 数据库(使用 JDBC),进行一些处理,并定期 return 解析结果。该应用程序使用 akka 集群,因此它可以水平扩展。
它应该如何工作
我正在集群上创建一个**单例参与者*,负责向 指令处理程序 参与者池发送指令。我正在从 Redis pub/sub 通道接收事件,该通道说明应刷新哪些 数据源 以及刷新频率。这个 SourceScheduler actor 将指令连同间隔存储在内部数组中。
然后我使用 akka Scheduler 每秒执行一个 tick 函数。此函数过滤数组以确定需要执行哪些指令,并将消息发送到指令处理程序池。池中的路由执行指令并通过 Redis Pub/Sub
发出结果问题
在我的机器上(Ryzen 7 + 16GB RAM + ArchLinux)一切正常 运行,我们正在轻松处理 2500 个数据库 calls/second。但是一旦投入生产,我无法让它处理超过 ~400 requests/s。
SourceScheduler 不会每秒滴答,消息会卡在邮箱中。此外,该应用程序使用更多 CPU 资源和更多内存(生产环境中为 1.3GB,而我的机器上为 ~350MB)
生产应用程序 运行s 在 Rancher 上基于 JRE-8 alpine 的 Docker 容器中,在 MS Azure 服务器上。
我知道集群上的单例 actor 可能 是一个瓶颈,但由于它只将消息转发给其他 actor,我不明白它如何阻止。
我试过的
- 我使用 Tomcat JDBC 作为 SQL 查询的连接池管理器。我确定我不会泄漏任何连接,因为我记录了从池中借用的每个连接以及 return 到它的每个连接
- 像 JDBC 查询这样的阻塞操作都是在一个单独的调度器上执行的,一个具有 500 个线程的固定线程池执行器,所以所有其他参与者应该 运行 正确地
- 我还为 SourceScheduler actor 提供了一个专用的固定调度程序,因此它应该 运行 在它自己的线程上
- 我已经尝试 运行将应用程序集成到具有 3 个节点的集群中,但没有提高性能。由于 SourceScheduler 是单例,运行宁多个节点不能解决问题
- 我已经在我同事的机器上试用了该应用程序。奇迹般有效。我只是遇到了生产服务器的问题
- 我已经尝试将生产服务器升级到 Azure 上可用的最强大的服务器(16 核,2.3ghz),没有明显的变化
有没有人经历过本地机器和生产服务器之间的这种差异?
编辑 SourceScheduler.scala
class SourceScheduler extends Actor with ActorLogging with Timers {
case object Tick
case object SchedulerReport
import context.dispatcher
val instructionHandlerPool = context.actorOf(
ClusterRouterGroup(
RoundRobinGroup(Nil),
ClusterRouterGroupSettings(
totalInstances = 10,
routeesPaths = List("/user/instructionHandler"),
allowLocalRoutees = true
)
).props(),
name = "instructionHandlerRouter")
var ticks: Int = 0
var refreshedSources: Int = 0
val maxTicks: Int = Int.MaxValue - 1
var scheduledSources = Array[(String, Int, String)]()
override def preStart(): Unit = {
log.info("Starting Scheduler")
}
def refreshSource(hash: String) = {
instructionHandlerPool ! Instruction(hash)
refreshedSources += 1
}
// Get sources that neeed to be refreshed
def getEligibleSources(sources: Seq[(String, Int, String)], tick: Int) = {
sources.groupBy(_._1).mapValues(_.toList.minBy(_._2)).values.filter(tick * 1000 % _._2 == 0).map(_._1)
}
def tick(): Unit = {
ticks += 1
log.debug("Scheduler TICK {}", ticks)
val eligibleSources = getEligibleSources(scheduledSources, ticks)
val chunks = eligibleSources.grouped(ConnectionPoolManager.connectionPoolSize).zipWithIndex.toList
log.debug("Scheduling {} sources in {} chunks", eligibleSources.size, chunks.size)
chunks.foreach({
case(sources, index) =>
after((index * 25 + 5) milliseconds, context.system.scheduler)(Future.successful {
sources.foreach(refreshSource)
})
})
if(ticks >= maxTicks) ticks = 0
}
timers.startPeriodicTimer("schedulerTickTimer", Tick, 990 milliseconds)
timers.startPeriodicTimer("schedulerReportTimer", SchedulerReport, 10 seconds)
def receive: Receive = {
case AttachSource(hash, interval, socketId) =>
scheduledSources.synchronized {
scheduledSources = scheduledSources :+ ((hash, interval, socketId))
}
case DetachSource(socketId) =>
scheduledSources.synchronized {
scheduledSources = scheduledSources.filterNot(_._3 == socketId)
}
case SchedulerReport =>
log.info("{} sources were scheduled since last report", refreshedSources)
refreshedSources = 0
case Tick => tick()
case _ =>
}
}
每个源都由包含执行所需的所有数据(例如数据库的主机)、刷新间隔和请求它的客户端的唯一 ID 的哈希确定,以便我们可以停止客户端断开连接时刷新。 每一秒,我们都会通过对 ticks 计数器的当前值应用模数来检查源是否需要刷新。 我们以较小的块刷新源以避免连接池饥饿 问题是在小负载下 (~300 rq/s) tick 函数不再每秒执行一次
也许瓶颈在网络延迟上?在您的机器中,所有组件 运行 并排,通信应该没有延迟,但在集群中,如果您从一台机器到另一台机器进行大量数据库调用,网络延迟可能会很明显。
事实证明问题出在 Rancher 身上。 我们进行了多次测试,该应用程序在机器上 运行 直接运行良好,在 docker 上运行良好,但在使用 Rancher 作为编排器时则不然。我不确定为什么,但因为它与 Akka 无关,所以我关闭了这个问题。 谢谢大家的帮助。