Akka 调度程序:生产中的奇怪行为(消息未触发)

Akka scheduler : strange behavior in production (messages not firing)

我正在开发一个 scala + akka 应用程序作为更大应用程序的一部分。该应用程序的目的是调用外部服务和 SQL 数据库(使用 JDBC),进行一些处理,并定期 return 解析结果。该应用程序使用 akka 集群,因此它可以水平扩展。

它应该如何工作

我正在集群上创建一个**单例参与者*,负责向 指令处理程序 参与者池发送指令。我正在从 Redis pub/sub 通道接收事件,该通道说明应刷新哪些 数据源 以及刷新频率。这个 SourceScheduler actor 将指令连同间隔存储在内部数组中。

然后我使用 akka Scheduler 每秒执行一个 tick 函数。此函数过滤数组以确定需要执行哪些指令,并将消息发送到指令处理程序池。池中的路由执行指令并通过 Redis Pub/Sub

发出结果

问题

在我的机器上(Ryzen 7 + 16GB RAM + ArchLinux)一切正常 运行,我们正在轻松处理 2500 个数据库 calls/second。但是一旦投入生产,我无法让它处理超过 ~400 requests/s。

SourceScheduler 不会每秒滴答,消息会卡在邮箱中。此外,该应用程序使用更多 CPU 资源和更多内存(生产环境中为 1.3GB,而我的机器上为 ~350MB)

生产应用程序 运行s 在 Rancher 上基于 JRE-8 alpine 的 Docker 容器中,在 MS Azure 服务器上。

我知道集群上的单例 actor 可能 是一个瓶颈,但由于它只将消息转发给其他 actor,我不明白它如何阻止。

我试过的

有没有人经历过本地机器和生产服务器之间的这种差异?


编辑 SourceScheduler.scala

class SourceScheduler extends Actor with ActorLogging with Timers {
  case object Tick
  case object SchedulerReport
  import context.dispatcher

  val instructionHandlerPool = context.actorOf(
    ClusterRouterGroup(
      RoundRobinGroup(Nil),
      ClusterRouterGroupSettings(
        totalInstances = 10,
        routeesPaths = List("/user/instructionHandler"),
        allowLocalRoutees = true
      )
    ).props(),
    name = "instructionHandlerRouter")

  var ticks: Int = 0
  var refreshedSources: Int = 0
  val maxTicks: Int = Int.MaxValue - 1

  var scheduledSources = Array[(String, Int, String)]()

  override def preStart(): Unit = {
    log.info("Starting Scheduler")
  }

  def refreshSource(hash: String) = {
    instructionHandlerPool ! Instruction(hash)
    refreshedSources += 1
  }

  // Get sources that neeed to be refreshed
  def getEligibleSources(sources: Seq[(String, Int, String)], tick: Int) = {
    sources.groupBy(_._1).mapValues(_.toList.minBy(_._2)).values.filter(tick * 1000 % _._2 == 0).map(_._1)
  }

  def tick(): Unit = {
    ticks += 1
    log.debug("Scheduler TICK {}", ticks)
    val eligibleSources = getEligibleSources(scheduledSources, ticks)
    val chunks = eligibleSources.grouped(ConnectionPoolManager.connectionPoolSize).zipWithIndex.toList
    log.debug("Scheduling {} sources in {} chunks", eligibleSources.size, chunks.size)
    chunks.foreach({
      case(sources, index) =>
        after((index * 25 + 5) milliseconds, context.system.scheduler)(Future.successful {
          sources.foreach(refreshSource)
        })
    })
    if(ticks >= maxTicks) ticks = 0
  }
  timers.startPeriodicTimer("schedulerTickTimer", Tick, 990 milliseconds)
  timers.startPeriodicTimer("schedulerReportTimer", SchedulerReport, 10 seconds)

  def receive: Receive = {
    case AttachSource(hash, interval, socketId) =>
      scheduledSources.synchronized {
        scheduledSources = scheduledSources :+ ((hash, interval, socketId))
      }
    case DetachSource(socketId) =>
      scheduledSources.synchronized {
        scheduledSources = scheduledSources.filterNot(_._3 == socketId)
      }
    case SchedulerReport =>
      log.info("{} sources were scheduled since last report", refreshedSources)
      refreshedSources = 0
    case Tick => tick()
    case _ =>
  }
}

每个源都由包含执行所需的所有数据(例如数据库的主机)、刷新间隔和请求它的客户端的唯一 ID 的哈希确定,以便我们可以停止客户端断开连接时刷新。 每一秒,我们都会通过对 ticks 计数器的当前值应用模数来检查源是否需要刷新。 我们以较小的块刷新源以避免连接池饥饿 问题是在小负载下 (~300 rq/s) tick 函数不再每秒执行一次

也许瓶颈在网络延迟上?在您的机器中,所有组件 运行 并排,通信应该没有延迟,但在集群中,如果您从一台机器到另一台机器进行大量数据库调用,网络延迟可能会很明显。

事实证明问题出在 Rancher 身上。 我们进行了多次测试,该应用程序在机器上 运行 直接运行良好,在 docker 上运行良好,但在使用 Rancher 作为编排器时则不然。我不确定为什么,但因为它与 Akka 无关,所以我关闭了这个问题。 谢谢大家的帮助。