Actor 有一段时间没有处理消息

Actor not processing messages for quite some time

在我的 akka 应用程序中,我使用一个主角色作为控制器,它接收命令并将其委托给处理器角色。处理器角色完成后(完成每个任务大约需要 2 分钟),将消息传递给控制器​​,然后然后控制器 actor ,将消息发送到数据库 actor 以进行持久化。处理器 actor 和 db 数据库 actor 都使用路由器管理,每个路由器都有 5 个路由。我使用的是默认调度程序,所有其他 akka 配置都是默认的。下面是场景。

控制器 actor 正在接收大约 100 条传递给处理器 actor 的消息,我可以从日志中看到处理器已经完成处理一些消息(大约 5 条)并将完成消息传递给控制器​​ actor。但是数据库 actor 在大约 5 分钟后开始执行。然而,在这 5 分钟内,处理器 actor 正在处理其未决消息。所以它不像应用程序处于空闲状态。

当消息量较小时,从控制器->处理器->控制器->db actor的流几乎是瞬时的,几乎没有任何延迟。

我不希望处理后出现这种延迟,数据库执行应该在处理完成后立即执行。但是似乎线程正忙于执行处理器 task.How 我可以克服这种情况吗,理想情况下我希望我的任务执行的周转时间更少,但由于上述行为我无法实现它。

默认情况下,所有 Akka actor 使用相同的执行器,最多只能使用 64 个线程。来自 https://doc.akka.io/docs/akka/current/general/configuration-reference.html :

# This will be used if you have set "executor = "default-executor"".
      # If an ActorSystem is created with a given ExecutionContext, this
      # ExecutionContext will be used as the default executor for all
      # dispatchers in the ActorSystem configured with
      # executor = "default-executor". Note that "default-executor"
      # is the default value for executor, and therefore used if not
      # specified otherwise. If no ExecutionContext is given,
      # the executor configured in "fallback" will be used.
      default-executor {
        fallback = "fork-join-executor"
      }

fork-join-executor配置:

# This will be used if you have set "executor = "fork-join-executor""
      # Underlying thread pool implementation is java.util.concurrent.ForkJoinPool
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 8

        # The parallelism factor is used to determine thread pool size using the
        # following formula: ceil(available processors * factor). Resulting size
        # is then bounded by the parallelism-min and parallelism-max values.
        parallelism-factor = 1.0

        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 64

        # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack
        # like peeking mode which "pop".
        task-peeking-mode = "FIFO"
      }

该问题可能与处理器参与者中的阻塞调用有关。 Akka 从 64 个池中分配单独的线程来处理处理器参与者中的这些阻塞调用,并等待其中一个完成消息处理,以便能够为其他参与者处理消息。这可能会导致您在演员之间观察到时间滞后。

Akka 所基于的一个关键方面是系统应始终保持响应。如果您将相同的 dispatcher/thread 池用于阻塞数据库操作或处理消息作为您的主要 Akka 路由基础设施,则所有 Akka 线程都可能被处理参与者或数据库操作占用,并且您的系统将有效直到其中一个阻塞操作完成时才会死锁。对于单个 JVM 上仅执行此任务的简单系统来说,这可能不是问题,但是当它扩展时,可能会导致很多问题。

在像您这样无法避免阻塞的情况下,应该使用专门的调度程序来阻塞操作。这个link讲的就是这方面(虽然是为Akka-Http起的标题,可以概括一下)。您可以创建两种类型的调度程序来处理两种不同的阻塞操作。我也认为你应该限制你的阻塞请求,以免压倒你的系统(使用调度程序进行限制)。您还可以在 actor 中实现缓冲区来处理背压情况。

编辑

控制器邮箱有 100 条消息,其中 5 条消息被接收并委托给处理器参与者。每个处理器参与者花费 2 分钟的时间将响应发送回控制器,响应在控制器的邮箱中排队。但是在处理这些消息之前,控制器需要处理在这些消息之前添加的消息,有效地增加了控制器处理消息的服务时间。滞后是整个过程的高潮。一旦控制器收到响应消息,它就会委托给 actor.I 认为处理时间随着消息的增加而增加。

如果有帮助请告诉我!!