akka 自定义 fork-join-executor 调度程序在 OSX 和 RHEL 上的行为不同
akka custom fork-join-executor dispatcher behaves differently on OSX and RHEL
当我在生产机器上使用 Akka 框架部署 Play 框架应用程序时,它的行为与在我的开发工作站上不同。
这是一个接收一批设备IP地址的系统,它对每台设备执行一些处理,并在处理完这批设备中的所有设备后汇总结果。此处理不是很 CPU 密集。
我基本上有两种类型的 actor,BatchActor 和 DeviceActor。对于这些设备,我创建了一个由 RoundRobinPool 路由器和自定义调度程序支持的参与者。我试图一次处理 ~500 个设备(并行)。
这个问题是当我在我的 OSX 机器上运行这段代码时,它运行正常,除了。
例如,如果我提交了一批 200 个设备 IP 地址,则应用程序在我的工作站上并行运行所有设备。
然而,当我将此应用程序复制到生产机器 Red Hat Enterprise Linux (RHEL) 并运行它并提交相同的设备列表时,它只处理 1 到 2 个设备一次.
我需要做什么来解决这个问题?
相关代码如下:
object Application extends Controller {
...
val numberOfWorkers = 500
val workers = Akka.system.actorOf(Props[DeviceActor]
.withRouter(RoundRobinPool(nrOfInstances = numberOfWorkers))
.withDispatcher("my-dispatcher")
)
def batchActor(config:BatchConfig)
= Akka.system.actorOf(BatchActor.props(workers, config), s"batch-${config.batchId}")
...
def batch = Action(parse.json) { request =>
request.body.validate[BatchConfig] match {
case config:BatchConfig => {
...
val batch = batchActor(config)
batch ! BatchActorProtocol.Start
Ok(Json.toJson(status))
}
...
}
}
application.conf 配置部分如下所示:
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1000
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 100.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 5000
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 500
}
在 BatchActor 中,我只是解析设备列表并将其提供给
class BatchActor(val workers:ActorRef, val config:BatchConfig) extends Actor
...
def receive = {
case Start => start
...
}
private def start = {
...
devices.map { devices =>
results(devices.host) = None
workers ! DeviceWork(self, config, devices, steps)
}
...
}
之后 WorkerActor 将结果对象提交回 BatchActer。
我的工作站:OS X - v10.9.3
java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)
生产机器:Red Hat Enterprise Linux 服务器版本 6.5 (Santiago)
java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (rhel-2.5.1.2.el6_5-x86_64 u65-b17)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
软件:
Scala: v2.11.2
SBT: v0.13.6
Play: v2.3.5
Akka: v2.3.4
我正在使用类型安全 activator/sbt 来启动应用程序。命令如下:
cd <project dir>
./activator run -Dhttp.port=6600
感谢任何帮助。我已经在这个问题上停留了几天了。
我认为您的代码中 并行性 太多,即您在调度程序中创建了太多线程。您的 Redhat 盒子上有多少个内核?我从未见过如此高的价值。 FJ 池中的大量线程可能会导致大量的上下文切换。尝试仅使用默认调度程序,看看是否能解决您的问题。您还可以将最小和最大并行度的值更改为您拥有的核心数的 2 或 3 倍。
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1000
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 100.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 5000
}
要尝试的另一件事是使用 (sbt-assembly) 创建一个超级 jar,然后部署它,而不是使用激活器来部署它。
最后,您可以使用 VisualJVM 或 Yourkit 之类的工具查看您的 JVM。
在花费数小时尝试不同的事情后,包括:
- 研究 linux 上的不同线程实现 - pthreads 与 NPTL
- 阅读所有关于线程的 VM 文档
- ulimits
- 尝试对 Play 和 Akka 框架配置进行各种更改
- 最后使用 scala futures 等完全重写了线程管理。
似乎没有任何效果。然后我做了一个详细的比较,唯一不同的是我在笔记本电脑上使用了 Oracle Hotspot 实现,而在生产机器上使用了 OpenJDK 实现。
所以我在生产机器上安装了 Oracle VM,这似乎解决了这个问题。尽管我无法确定最终的解决方案是什么,但似乎 OpenJDK 在 RHEL 上的默认安装已被编译或配置得足够不同,以至于不允许一次生成约 500 个线程。
我确定我遗漏了一些东西,但经过大约 3 天的搜索我找不到它。
当我在生产机器上使用 Akka 框架部署 Play 框架应用程序时,它的行为与在我的开发工作站上不同。
这是一个接收一批设备IP地址的系统,它对每台设备执行一些处理,并在处理完这批设备中的所有设备后汇总结果。此处理不是很 CPU 密集。
我基本上有两种类型的 actor,BatchActor 和 DeviceActor。对于这些设备,我创建了一个由 RoundRobinPool 路由器和自定义调度程序支持的参与者。我试图一次处理 ~500 个设备(并行)。
这个问题是当我在我的 OSX 机器上运行这段代码时,它运行正常,除了。
例如,如果我提交了一批 200 个设备 IP 地址,则应用程序在我的工作站上并行运行所有设备。
然而,当我将此应用程序复制到生产机器 Red Hat Enterprise Linux (RHEL) 并运行它并提交相同的设备列表时,它只处理 1 到 2 个设备一次.
我需要做什么来解决这个问题?
相关代码如下:
object Application extends Controller {
...
val numberOfWorkers = 500
val workers = Akka.system.actorOf(Props[DeviceActor]
.withRouter(RoundRobinPool(nrOfInstances = numberOfWorkers))
.withDispatcher("my-dispatcher")
)
def batchActor(config:BatchConfig)
= Akka.system.actorOf(BatchActor.props(workers, config), s"batch-${config.batchId}")
...
def batch = Action(parse.json) { request =>
request.body.validate[BatchConfig] match {
case config:BatchConfig => {
...
val batch = batchActor(config)
batch ! BatchActorProtocol.Start
Ok(Json.toJson(status))
}
...
}
}
application.conf 配置部分如下所示:
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1000
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 100.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 5000
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 500
}
在 BatchActor 中,我只是解析设备列表并将其提供给
class BatchActor(val workers:ActorRef, val config:BatchConfig) extends Actor
...
def receive = {
case Start => start
...
}
private def start = {
...
devices.map { devices =>
results(devices.host) = None
workers ! DeviceWork(self, config, devices, steps)
}
...
}
之后 WorkerActor 将结果对象提交回 BatchActer。
我的工作站:OS X - v10.9.3
java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)
生产机器:Red Hat Enterprise Linux 服务器版本 6.5 (Santiago)
java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (rhel-2.5.1.2.el6_5-x86_64 u65-b17)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
软件:
Scala: v2.11.2
SBT: v0.13.6
Play: v2.3.5
Akka: v2.3.4
我正在使用类型安全 activator/sbt 来启动应用程序。命令如下:
cd <project dir>
./activator run -Dhttp.port=6600
感谢任何帮助。我已经在这个问题上停留了几天了。
我认为您的代码中 并行性 太多,即您在调度程序中创建了太多线程。您的 Redhat 盒子上有多少个内核?我从未见过如此高的价值。 FJ 池中的大量线程可能会导致大量的上下文切换。尝试仅使用默认调度程序,看看是否能解决您的问题。您还可以将最小和最大并行度的值更改为您拥有的核心数的 2 或 3 倍。
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1000
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 100.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 5000
}
要尝试的另一件事是使用 (sbt-assembly) 创建一个超级 jar,然后部署它,而不是使用激活器来部署它。
最后,您可以使用 VisualJVM 或 Yourkit 之类的工具查看您的 JVM。
在花费数小时尝试不同的事情后,包括:
- 研究 linux 上的不同线程实现 - pthreads 与 NPTL
- 阅读所有关于线程的 VM 文档
- ulimits
- 尝试对 Play 和 Akka 框架配置进行各种更改
- 最后使用 scala futures 等完全重写了线程管理。
似乎没有任何效果。然后我做了一个详细的比较,唯一不同的是我在笔记本电脑上使用了 Oracle Hotspot 实现,而在生产机器上使用了 OpenJDK 实现。
所以我在生产机器上安装了 Oracle VM,这似乎解决了这个问题。尽管我无法确定最终的解决方案是什么,但似乎 OpenJDK 在 RHEL 上的默认安装已被编译或配置得足够不同,以至于不允许一次生成约 500 个线程。
我确定我遗漏了一些东西,但经过大约 3 天的搜索我找不到它。