akka 自定义 fork-join-executor 调度程序在 OSX 和 RHEL 上的行为不同

akka custom fork-join-executor dispatcher behaves differently on OSX and RHEL

当我在生产机器上使用 Akka 框架部署 Play 框架应用程序时,它的行为与在我的开发工作站上不同。

这是一个接收一批设备IP地址的系统,它对每台设备执行一些处理,并在处理完这批设备中的所有设备后汇总结果。此处理不是很 CPU 密集。

我基本上有两种类型的 actor,BatchActor 和 DeviceActor。对于这些设备,我创建了一个由 RoundRobinPool 路由器和自定义调度程序支持的参与者。我试图一次处理 ~500 个设备(并行)。

这个问题是当我在我的 OSX 机器上运行这段代码时,它运行正常,除了。

例如,如果我提交了一批 200 个设备 IP 地址,则应用程序在我的工作站上并行运行所有设备。

然而,当我将此应用程序复制到生产机器 Red Hat Enterprise Linux (RHEL) 并运行它并提交相同的设备列表时,它只处理 1 到 2 个设备一次.

我需要做什么来解决这个问题?

相关代码如下:

object Application extends Controller {
    ...

    val numberOfWorkers = 500
    val workers = Akka.system.actorOf(Props[DeviceActor]
          .withRouter(RoundRobinPool(nrOfInstances = numberOfWorkers))
          .withDispatcher("my-dispatcher")
    )

    def batchActor(config:BatchConfig) 
        = Akka.system.actorOf(BatchActor.props(workers, config), s"batch-${config.batchId}")

    ...

    def batch = Action(parse.json) { request =>
        request.body.validate[BatchConfig] match {

            case config:BatchConfig => {
                ...

                val batch = batchActor(config)
                batch ! BatchActorProtocol.Start

                Ok(Json.toJson(status))
            }

            ...

        }
    }

application.conf 配置部分如下所示:

my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 1000
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 100.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 5000
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 500
}

在 BatchActor 中,我只是解析设备列表并将其提供给

class BatchActor(val workers:ActorRef, val config:BatchConfig) extends Actor
  ...

  def receive = {
      case Start => start
      ...
  }

  private def start = {
      ...

      devices.map { devices =>
        results(devices.host) = None
        workers ! DeviceWork(self, config, devices, steps) 
      }

      ...
  }

之后 WorkerActor 将结果对象提交回 BatchActer。

我的工作站:OS X - v10.9.3

java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode) 

生产机器:Red Hat Enterprise Linux 服务器版本 6.5 (Santiago)

java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (rhel-2.5.1.2.el6_5-x86_64 u65-b17)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)

软件:

Scala: v2.11.2
SBT: v0.13.6
Play: v2.3.5
Akka: v2.3.4

我正在使用类型安全 activator/sbt 来启动应用程序。命令如下:

cd <project dir>
./activator run -Dhttp.port=6600

感谢任何帮助。我已经在这个问题上停留了几天了。

我认为您的代码中 并行性 太多,即您在调度程序中创建了太多线程。您的 Redhat 盒子上有多少个内核?我从未见过如此高的价值。 FJ 池中的大量线程可能会导致大量的上下文切换。尝试仅使用默认调度程序,看看是否能解决您的问题。您还可以将最小和最大并行度的值更改为您拥有的核心数的 2 或 3 倍。

fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 1000
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 100.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 5000
  }

要尝试的另一件事是使用 (sbt-assembly) 创建一个超级 jar,然后部署它,而不是使用激活器来部署它。

最后,您可以使用 VisualJVM 或 Yourkit 之类的工具查看您的 JVM。

在花费数小时尝试不同的事情后,包括:

  • 研究 linux 上的不同线程实现 - pthreads 与 NPTL
  • 阅读所有关于线程的 VM 文档
  • ulimits
  • 尝试对 Play 和 Akka 框架配置进行各种更改
  • 最后使用 scala futures 等完全重写了线程管理。

似乎没有任何效果。然后我做了一个详细的比较,唯一不同的是我在笔记本电脑上使用了 Oracle Hotspot 实现,而在生产机器上使用了 OpenJDK 实现。

所以我在生产机器上安装了 Oracle VM,这似乎解决了这个问题。尽管我无法确定最终的解决方案是什么,但似乎 OpenJDK 在 RHEL 上的默认安装已被编译或配置得足够不同,以至于不允许一次生成约 500 个线程。

我确定我遗漏了一些东西,但经过大约 3 天的搜索我找不到它。