如何在 ExecutorService 的执行者之间进行同步

How to synchronize between executors of an ExecutorService

我有一个客户端套接字列表,通常大小在 2000 左右。这些客户端是动态的,它们来来去去。

我有一个 ExecutorService,它有一个由 32 个线程组成的固定线程池来处理这些线程。这个执行者服务负责解码和发送要发送给这2000个客户端的消息。

我想防止执行器服务的两个(或多个)线程同时处理同一个客户端。

一种方法是引入另一个簿记线程(所以我最终有 32 + 1 个线程)负责在上一条消息对应时调用 ExecutorService.submit(mesage)对同一客户完成。但是我不确定这是否会引入瓶颈,也就是说这个新引入的簿记线程跟不上提交消息。

理想情况下,我不想预先将线程预分配给一组客户端,因为消息负载在客户端之间分布不均。也是事先不知道的。

这有什么方法?它们是由 java.util.concurrent 功能提供的吗?

更新

这是一个快速总结,因为评论指出存在一些误解:

让每个线程为其自己的队列服务。给插座编号。将每个请求放入队列[socket num % num of threads].

这将确保来自特定套接字的请求按顺序处理。

很遗憾,您无法通过这种方式实现负载平衡。

或者,使用并发哈希映射来存储正在服务的套接字。如果线程为当前正在处理的套接字的请求提供服务,只需将请求放回队列中即可。

当线程 (A) 开始处理消息 (#1) 时,它需要向共享管理器对象注册客户端 ID。对于每个注册的客户,都有一个队列。

当另一个线程 (B) 开始为同一客户端处理消息 (#2) 时,注册将检测到线程 A 已经在处理,并将消息 #2 添加到客户端的队列中。然后线程 B 将停止并处理下一条消息。

当线程 A 处理完消息 #1 后,它会尝试取消注册,但由于消息 #2 是队列,线程 A 将开始处理该消息。之后,当它再次尝试取消注册时,没有排队的消息,线程将停止并处理下一条消息。

正确同步访问取决于管理器代码,因此第二条消息要么由线程 B 处理,要么交给线程 A,而不会丢失。

以上逻辑确保线程 B 不会等待线程 A,即没有空闲时间,并且消息 #2 会尽快处理,即以最小延迟处理,而不会为同一客户端处理两条消息同时。

每个客户端的消息顺序被保留。在全局范围内,当然不会保留消息顺序,因为消息#2 的处理被延迟了。

注意,每个线程只有一个队列,所以只有 32 个队列,并且只有 "duplicate" 条消息在队列中,所以所有队列通常都是空的。


更新

示例:为了便于识别,消息被命名为 clientId.messageId,其中 messageId 是全局的。

消息按以下顺序提交给执行器(3 个线程):

1.1, 2.2, 1.3, 2.4, 3.5, 1.6

  1. 线程 A 启动 1.1 并开始处理。

  2. 线程 B 获取 2.2 并开始处理。

  3. 线程 C 选择 1.3,将其添加到线程 A 的队列中,然后 returns.

  4. 线程 C 选择 2.4,将其添加到线程 B 的队列中,然后 returns.

  5. 线程 C 获取 3.5 并开始处理。

  6. 线程 A 已完成消息 1.1 并开始处理 1.3

  7. 线程 C 已完成消息 3.5 和 returns。

  8. 线程 C 选择 1.6,将其添加到线程 A 的队列中,然后 returns。
    线程 C 现在空闲。

  9. 线程 B 已完成消息 2.2 并开始处理 2.4

  10. 线程 A 已完成消息 1.3 并开始处理 1.6

  11. 线程 B 已完成消息 2.4 和 returns。
    线程 B 现在空闲。

  12. 线程 A 已完成消息 1.6 和 returns。
    线程 A 现在空闲。

您想按顺序处理每个客户端的消息,同时您不想为每个客户端分配单独的线程。这是使用 Actor model 的确切用例。 Actor 就像轻量级线程。它们不像通常的线程那样强大,但非常适合像您这样的可重复任务。 如果你觉得 Google 找到的 java actor libraries 太重量级了,你可以使用我的 Github 库中的 most compact actor implementation,或者查看我的异步库中包含的扩展 actor 实现 df4j.