参与者、ForkJoinPool 和消息排序
Actors, ForkJoinPool, and ordering of messages
我需要帮助了解 Actor 系统如何使用 ForkJoinPool 并保持顺序保证。
我一直在玩Actr https://github.com/zakgof/actr,这是一个简单的小型演员系统。我认为我的问题也适用于 Akka。我有一段简单的代码可以发送一个 Actor 编号 1 到 10。Actor 只是打印消息;并且消息不按顺序排列。我得到 1,2,4,3,5,6,8,7,9,10。
我认为这与 ForkJoinPool 有关。 Actr 将消息包装到 Runnable 中并将其发送到 ForkJoin Executor。当任务执行时,它会将消息放入目标 Actor 的队列中并进行处理。我对 ForkJoinPool 的理解是将任务分发到多个线程。我添加了日志记录,消息 1、2、3... 被分发到不同的线程,消息被无序地放入 Actor 的队列。
我错过了什么吗? Actr 的 Scheduler 类似于 Akka 的 Disapatcher,可以在这里找到:https://github.com/zakgof/actr/blob/master/src/main/java/com/zakgof/actr/impl/ExecutorBasedScheduler.java
ExecutorBasedScheduler 是用 ForkJoinPool.commonPool 构造的,如下所示:
public static IActorScheduler newForkJoinPoolScheduler(int throughput) {
return new ExecutorBasedScheduler(ForkJoinPool.commonPool(), throughput);
}
Actor 如何使用 ForkJoinPool 并保持消息有序?
我根本无法与 Actr 通话,但在 Akka 中,单个消息不会创建为 ForkJoinPool 任务。 (每条消息一个任务似乎是一个非常糟糕的方法,原因有很多,不仅仅是排序问题。也就是说,消息通常可以非常快速地处理,如果你每条消息有一个任务,那么开销会非常高。你想要一些批处理,至少在负载下,这样您可以获得更好的线程局部性和更少的开销。)
本质上,在 Akka 中,参与者邮箱是对象内的队列。当邮箱收到一条消息时,它会检查它是否已经安排了一个任务,如果没有,它会向 ForkJoinPool 添加一个新任务。所以 ForkJoinPool 任务不是“处理这条消息”,而是“处理与这个特定 Actor 的邮箱关联的 Runnable”。在任务被安排和 Runnable 运行之前,显然会经过一段时间。当 Runnable 运行时,邮箱可能会收到更多的消息。但是它们只是被添加到队列中,然后 Runnable 将按照它们被接收的顺序处理尽可能多的配置。
这就是为什么在Akka中,你可以保证一个邮箱中消息的顺序,但不能保证发送给不同Actor的消息的顺序。如果我将消息 A 发送给 Actor Alpha,然后将消息 B 发送给 Actor Beta,然后将消息 C 发送给 Actor Alpha,我可以保证 A 将在 C 之前。但是 B 可能在 A 和 C 之前、之后或同时发生.(因为A和C会被同一个任务处理,而B会是不同的任务。)
Messaging Ordering Docs :有关订购的保证内容和不保证内容的更多详细信息。
Dispatcher Docs : Dispatchers 是Actor和实际执行之间的连接。 ForkJoinPool 只是一种实现(尽管是一种非常常见的实现)。
编辑:只是想我会添加一些指向 Akka 源代码的链接来说明。请注意,这些都是内部 API。 tell
就是你如何使用它,这一切都在幕后。 (我正在使用永久链接,这样我的链接就不会失效,但请注意,Akka 可能在您使用的版本中发生了变化。)
关键位在akka.dispatch.Dispatcher.scala
您的 tell
将经过一些步骤才能到达正确的邮箱。但最终:
- dispatch 方法被调用入队。这个很简单,入队然后调用
registerForExecution
方法
- registerForExecution这个方法实际上是先检查是否需要调度。如果它需要调度它使用 executorService 来调度它。请注意,executorService 是抽象的,但是
execute
是在提供邮箱作为参数的服务上调用的。
- execute
如果我们假设实现是 ForkJoinPool,这就是我们最终进入的 executorService 执行方法。本质上,我们只是创建一个 ForkJoinTask,并将提供的参数(邮箱)作为可运行的。
- run 邮箱很方便
Runnable
因此 ForkJoinPool 最终会在安排后调用此方法。您可以看到它处理特殊的系统消息然后调用 processMailbox
然后(最后)再次调用 registerForExecution
。请注意,registerForExecution
首先检查它是否需要调度,因此这不是无限循环,它只是检查是否还有剩余工作要做。当我们在邮箱中时 class 您还可以查看我们在 Dispatcher 中使用的一些方法,以查看是否需要调度、实际将消息添加到队列等。
- processMailbox 本质上只是调用
actor.invoke
的循环,除了它必须做很多检查以查看它是否有系统消息,是否停止工作,是否超过阈值, 如果已经中断等
- invoke 是您编写的代码(receiveMessage)实际被调用的地方。
如果您真的点击了所有这些链接,您会发现我做了很多简化。有很多错误处理和代码来确保一切都是线程安全的、超级高效的和防弹的。但这就是代码流的要点。
我需要帮助了解 Actor 系统如何使用 ForkJoinPool 并保持顺序保证。
我一直在玩Actr https://github.com/zakgof/actr,这是一个简单的小型演员系统。我认为我的问题也适用于 Akka。我有一段简单的代码可以发送一个 Actor 编号 1 到 10。Actor 只是打印消息;并且消息不按顺序排列。我得到 1,2,4,3,5,6,8,7,9,10。
我认为这与 ForkJoinPool 有关。 Actr 将消息包装到 Runnable 中并将其发送到 ForkJoin Executor。当任务执行时,它会将消息放入目标 Actor 的队列中并进行处理。我对 ForkJoinPool 的理解是将任务分发到多个线程。我添加了日志记录,消息 1、2、3... 被分发到不同的线程,消息被无序地放入 Actor 的队列。
我错过了什么吗? Actr 的 Scheduler 类似于 Akka 的 Disapatcher,可以在这里找到:https://github.com/zakgof/actr/blob/master/src/main/java/com/zakgof/actr/impl/ExecutorBasedScheduler.java
ExecutorBasedScheduler 是用 ForkJoinPool.commonPool 构造的,如下所示:
public static IActorScheduler newForkJoinPoolScheduler(int throughput) {
return new ExecutorBasedScheduler(ForkJoinPool.commonPool(), throughput);
}
Actor 如何使用 ForkJoinPool 并保持消息有序?
我根本无法与 Actr 通话,但在 Akka 中,单个消息不会创建为 ForkJoinPool 任务。 (每条消息一个任务似乎是一个非常糟糕的方法,原因有很多,不仅仅是排序问题。也就是说,消息通常可以非常快速地处理,如果你每条消息有一个任务,那么开销会非常高。你想要一些批处理,至少在负载下,这样您可以获得更好的线程局部性和更少的开销。)
本质上,在 Akka 中,参与者邮箱是对象内的队列。当邮箱收到一条消息时,它会检查它是否已经安排了一个任务,如果没有,它会向 ForkJoinPool 添加一个新任务。所以 ForkJoinPool 任务不是“处理这条消息”,而是“处理与这个特定 Actor 的邮箱关联的 Runnable”。在任务被安排和 Runnable 运行之前,显然会经过一段时间。当 Runnable 运行时,邮箱可能会收到更多的消息。但是它们只是被添加到队列中,然后 Runnable 将按照它们被接收的顺序处理尽可能多的配置。
这就是为什么在Akka中,你可以保证一个邮箱中消息的顺序,但不能保证发送给不同Actor的消息的顺序。如果我将消息 A 发送给 Actor Alpha,然后将消息 B 发送给 Actor Beta,然后将消息 C 发送给 Actor Alpha,我可以保证 A 将在 C 之前。但是 B 可能在 A 和 C 之前、之后或同时发生.(因为A和C会被同一个任务处理,而B会是不同的任务。)
Messaging Ordering Docs :有关订购的保证内容和不保证内容的更多详细信息。
Dispatcher Docs : Dispatchers 是Actor和实际执行之间的连接。 ForkJoinPool 只是一种实现(尽管是一种非常常见的实现)。
编辑:只是想我会添加一些指向 Akka 源代码的链接来说明。请注意,这些都是内部 API。 tell
就是你如何使用它,这一切都在幕后。 (我正在使用永久链接,这样我的链接就不会失效,但请注意,Akka 可能在您使用的版本中发生了变化。)
关键位在akka.dispatch.Dispatcher.scala
您的 tell
将经过一些步骤才能到达正确的邮箱。但最终:
- dispatch 方法被调用入队。这个很简单,入队然后调用
registerForExecution
方法 - registerForExecution这个方法实际上是先检查是否需要调度。如果它需要调度它使用 executorService 来调度它。请注意,executorService 是抽象的,但是
execute
是在提供邮箱作为参数的服务上调用的。 - execute 如果我们假设实现是 ForkJoinPool,这就是我们最终进入的 executorService 执行方法。本质上,我们只是创建一个 ForkJoinTask,并将提供的参数(邮箱)作为可运行的。
- run 邮箱很方便
Runnable
因此 ForkJoinPool 最终会在安排后调用此方法。您可以看到它处理特殊的系统消息然后调用processMailbox
然后(最后)再次调用registerForExecution
。请注意,registerForExecution
首先检查它是否需要调度,因此这不是无限循环,它只是检查是否还有剩余工作要做。当我们在邮箱中时 class 您还可以查看我们在 Dispatcher 中使用的一些方法,以查看是否需要调度、实际将消息添加到队列等。 - processMailbox 本质上只是调用
actor.invoke
的循环,除了它必须做很多检查以查看它是否有系统消息,是否停止工作,是否超过阈值, 如果已经中断等 - invoke 是您编写的代码(receiveMessage)实际被调用的地方。
如果您真的点击了所有这些链接,您会发现我做了很多简化。有很多错误处理和代码来确保一切都是线程安全的、超级高效的和防弹的。但这就是代码流的要点。