反应迟钝的参与者系统:ThreadPoolExecutor 调度程序只创建核心线程池,显然忽略了最大线程池大小

Unresponsive actor system: ThreadPoolExecutor dispatcher only creates core thread pool, apparently ignores max thread pool size

更新:我发现如果我将 ThreadPoolExecutor's 核心池大小设置为与最大池大小(29 个线程)相同,我的程序仍然可以响应。但是,如果我将核心池大小设置为 11,将最大池大小设置为 29,那么 actor 系统只会创建 11 个线程。如何配置 ActorSystem / ThreadPoolExecutor 以继续创建线程以超过核心线程数并保持在最大线程数内?我不想将核心线程数设置为最大线程数,因为我只需要额外的线程来取消作业(这应该是罕见事件)。


我有一个针对 Oracle 数据库的批处理程序 运行ning,使用 Java/Akka 类型的角色和以下角色实现:

  1. BatchManager 负责与 REST 控制器对话。它管理 Queue 个未初始化的批处理作业;当从队列中轮询未初始化的批处理作业时,它会变成 JobManager actor 并执行。
  2. JobManager维护一个存储过程队列和一个Workers池;它用存储过程初始化每个 Worker,当 Worker 完成时,它将过程的结果发送到 JobManager,然后 JobManager 将另一个存储过程发送到 Worker。当作业队列为空且所有 Workers 空闲时批处理终止,此时 JobManager 将其结果报告给 BatchManager,关闭其工作程序(通过 TypedActor.context().stop() ),然后自行关闭。 JobManager 有一个 Promise<Status> completion,它在作业成功完成时完成,或者在作业因取消或致命异常而终止时完成。
  3. Worker 执行存储过程。它创建用于执行存储过程的 OracleConnection and a CallableStatement,并使用 JobManager.completionabort 连接和 cancel 语句注册一个 onFailure 回调。此回调不使用 actor 系统的执行上下文,而是使用从 BatchManager.
  4. 中创建的缓存执行程序服务创建的执行上下文

我的配置是

{"akka" : { "actor" : { "default-dispatcher" : {
    "type" : "Dispatcher",
    "executor" : "default-executor",
    "throughput" : "1",
    "default-executor" : { "fallback" : "thread-pool-executor" }
    "thread-pool-executor" : {
        "keep-alive-time" : "60s",
        "core-pool-size-min" : coreActorCount,
        "core-pool-size-max" : coreActorCount,
        "max-pool-size-min" : maxActorCount,
        "max-pool-size-max" : maxActorCount,
        "task-queue-size" : "-1",
        "task-queue-type" : "linked",
        "allow-core-timeout" : "on"
}}}}}

工人数量配置在别处,目前workerCount = 8coreActorCountworkerCount + 3maxActorCountworkerCount * 3 + 5。我正在具有两个内核和 8GB 内存的 Macbook Pro 10 上对此进行测试;生产服务器要大得多。我正在与之交谈的数据库位于极慢的 VPN 之后。我正在 运行 使用 Oracle 的 JavaSE 1.8 JVM 完成所有这些工作。本地服务器是 Tomcat 7。Oracle JDBC 驱动程序是 10.2 版(我也许可以说服当局使用更新的版本)。所有方法 return voidFuture<> 都应该是非阻塞的。

当一个批次成功终止时,就没有问题了——下一批次立即开始,并有完整的工作人员。但是,如果我通过 JobManager#completion.tryFailure(new CancellationException("Batch cancelled")) 终止当前批处理,则 Workers 注册的 onFailure 回调将关闭,然后系统变得无响应。调试 printlns 表明新批次从八个工作人员中的三个开始,并且 BatchManager 变得完全没有响应(我添加了一个 Future<String> ping 命令,它只是 return 一个 Futures.successful("ping") 和这也会超时)。 onFailure 回调在单独的线程池上执行,即使它们在 actor 系统的线程池上,我也应该有足够高的 max-pool-size 来容纳原始的 JobManager,它的 Workers,它的 onFailure 回调,第二个 JobManagerWorkers。相反,我似乎在容纳原始的 JobManager 及其 Workers、新的 JobManager 及其不到一半的 Workers,并且没有为 [=61= 留下任何东西] 我 运行 正在使用的计算机资源不足,但它似乎应该能够 运行 十几个线程。

这是配置问题吗?这是由于 JVM 施加的限制 and/or 和 Tomcat 施加的限制吗?这是因为我处理阻塞 IO 的方式有问题吗?可能还有其他几件事我可能做错了,这些只是我想到的。

Gist of CancellableStatement 其中 CallableStatementOracleConnection 被取消

Gist of Immutable 其中 CancellableStatements 被创建

Gist of JobManager's cleanup code

Config dump 通过 System.out.println(mergedConfig.toString());

获得

编辑:我相信我已经将问题缩小到参与者系统(它的配置或者它与阻塞数据库调用的交互)。我删除了 Worker actors 并将他们的工作负载转移到 Runnables 上执行固定大小 ThreadPoolExecutor,其中每个 JobManager 创建自己的 ThreadPoolExecutor 并关闭它批处理完成时关闭(shutDown 正常终止,shutDownNow 异常终止)。在 BatchManager 中实例化的缓存线程池上取消 运行s。 Actor 系统的调度程序仍然是 ThreadPoolExecutor,但只分配了六个线程。使用此替代设置,取消会按预期执行 - 工作线程在其数据库连接中止时终止,并且新的 JobManager 会立即执行完整的工作线程。这向我表明这不是 hardware/JVM/Tomcat 问题。


更新:我使用 Eclipse's Memory Analyzer 进行了线程转储。我发现取消线程挂在 CallableStatement.close() 上,所以我重新排序取消,以便 OracleConnection.abort()CallableStatement.cancel() 之前,这解决了问题 - 所有取消(显然)都正确执行。 Worker 线程继续挂在他们的声明上,但是 - 我怀疑我的 VPN 可能部分或全部归咎于此。

PerformanceAsync-akka.actor.default-dispatcher-19
  at java.net.SocketInputStream.socketRead0(Ljava/io/FileDescriptor;[BIII)I (Native Method)
  at java.net.SocketInputStream.read([BIII)I (SocketInputStream.java:150)
  at java.net.SocketInputStream.read([BII)I (SocketInputStream.java:121)
  at oracle.net.ns.Packet.receive()V (Unknown Source)
  at oracle.net.ns.DataPacket.receive()V (Unknown Source)
  at oracle.net.ns.NetInputStream.getNextPacket()V (Unknown Source)
  at oracle.net.ns.NetInputStream.read([BII)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read([B)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read()I (Unknown Source)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1()S (T4CMAREngine.java:1109)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1()B (T4CMAREngine.java:1080)
  at oracle.jdbc.driver.T4C8Oall.receive()V (T4C8Oall.java:485)
  at oracle.jdbc.driver.T4CCallableStatement.doOall8(ZZZZ)V (T4CCallableStatement.java:218)
  at oracle.jdbc.driver.T4CCallableStatement.executeForRows(Z)V (T4CCallableStatement.java:971)
  at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout()V (OracleStatement.java:1192)
  at oracle.jdbc.driver.OraclePreparedStatement.executeInternal()I (OraclePreparedStatement.java:3415)
  at oracle.jdbc.driver.OraclePreparedStatement.execute()Z (OraclePreparedStatement.java:3521)
  at oracle.jdbc.driver.OracleCallableStatement.execute()Z (OracleCallableStatement.java:4612)
  at com.util.CPProcExecutor.execute(Loracle/jdbc/OracleConnection;Ljava/sql/CallableStatement;Lcom/controller/BaseJobRequest;)V (CPProcExecutor.java:57)

然而,即使在修复了取消订单之后,我仍然遇到 actor 系统没有创建足够线程的问题:我仍然只能在新批次中获得八分之三的工人,并添加了新工人因为取消的工作人员的网络连接超时。我总共有 11 个线程——我的核心池大小,29 个线程中的——我的最大池大小。显然 actor 系统忽略了我的最大池大小参数,或者我没有正确配置最大池大小。

没有足够的代码来提供解决方案,但是当系统变得无响应时,您可以检查您的系统资源利用率(cpu、ram),如果它们没有改变,请检查 Oracle 数据库。

如果当您终止一组连接时,另一个作业会立即启动:我猜想在 oracle 级别存在阻塞会话(未提交的写事务正在阻塞同一资源上的另一个写事务)。

处于无响应状态时,检查阻塞会话:

SELECT s1.username || '@' || s1.machine
    || ' ( SID=' || s1.sid || ' )  is blocking '
    || s2.username || '@' || s2.machine || ' ( SID=' || s2.sid || ' ) ' AS blocking_status
    FROM v$lock l1, v$session s1, v$lock l2, v$session s2
    WHERE s1.sid=l1.sid AND s2.sid=l2.sid
    AND l1.BLOCK=1 AND l2.request > 0
    AND l1.id1 = l2.id1
    AND l1.id2 = l2.id2

(免责声明:我不知道 Akka)

根据你下面配置的queue-size=-1,我猜任务队列是无界的。

  "task-queue-size": "-1",
  "task-queue-type": "linked"

ThreadPoolExecutor 不会生成超出核心池大小的内容,除非工作队列已满且无法排队。只有当任务队列已满时,它才会开始生成最大线程数。

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

请检查您是否可以修复有限的队列大小并查看线程是否增加到最大线程数。谢谢