如何在 ExecutorService 的执行者之间进行同步
How to synchronize between executors of an ExecutorService
我有一个客户端套接字列表,通常大小在 2000 左右。这些客户端是动态的,它们来来去去。
我有一个 ExecutorService
,它有一个由 32 个线程组成的固定线程池来处理这些线程。这个执行者服务负责解码和发送要发送给这2000个客户端的消息。
我想防止执行器服务的两个(或多个)线程同时处理同一个客户端。
一种方法是引入另一个簿记线程(所以我最终有 32 + 1 个线程)负责在上一条消息对应时调用 ExecutorService.submit(mesage)
对同一客户完成。但是我不确定这是否会引入瓶颈,也就是说这个新引入的簿记线程跟不上提交消息。
理想情况下,我不想预先将线程预分配给一组客户端,因为消息负载在客户端之间分布不均。也是事先不知道的。
这有什么方法?它们是由 java.util.concurrent
功能提供的吗?
更新
这是一个快速总结,因为评论指出存在一些误解:
我不希望每个客户端一个线程,因为我最终会得到 2000 个线程。
理想情况下,我不想将线程预分配给一组客户端,因为消息速率在所有客户端之间分布不均,而且事先不知道。
必须保留消息顺序。
我认为线程 A
正在等待线程 B
是不好的,因为 B
已经在向同一个客户端发送消息。换句话说,始终只有一个线程在处理一个客户端。
让每个线程为其自己的队列服务。给插座编号。将每个请求放入队列[socket num % num of threads].
这将确保来自特定套接字的请求按顺序处理。
很遗憾,您无法通过这种方式实现负载平衡。
或者,使用并发哈希映射来存储正在服务的套接字。如果线程为当前正在处理的套接字的请求提供服务,只需将请求放回队列中即可。
当线程 (A) 开始处理消息 (#1) 时,它需要向共享管理器对象注册客户端 ID。对于每个注册的客户,都有一个队列。
当另一个线程 (B) 开始为同一客户端处理消息 (#2) 时,注册将检测到线程 A 已经在处理,并将消息 #2 添加到客户端的队列中。然后线程 B 将停止并处理下一条消息。
当线程 A 处理完消息 #1 后,它会尝试取消注册,但由于消息 #2 是队列,线程 A 将开始处理该消息。之后,当它再次尝试取消注册时,没有排队的消息,线程将停止并处理下一条消息。
正确同步访问取决于管理器代码,因此第二条消息要么由线程 B 处理,要么交给线程 A,而不会丢失。
以上逻辑确保线程 B 不会等待线程 A,即没有空闲时间,并且消息 #2 会尽快处理,即以最小延迟处理,而不会为同一客户端处理两条消息同时。
每个客户端的消息顺序被保留。在全局范围内,当然不会保留消息顺序,因为消息#2 的处理被延迟了。
注意,每个线程只有一个队列,所以只有 32 个队列,并且只有 "duplicate" 条消息在队列中,所以所有队列通常都是空的。
更新
示例:为了便于识别,消息被命名为 clientId.messageId
,其中 messageId
是全局的。
消息按以下顺序提交给执行器(3 个线程):
1.1, 2.2, 1.3, 2.4, 3.5, 1.6
线程 A 启动 1.1
并开始处理。
线程 B 获取 2.2
并开始处理。
线程 C 选择 1.3
,将其添加到线程 A 的队列中,然后 returns.
线程 C 选择 2.4
,将其添加到线程 B 的队列中,然后 returns.
线程 C 获取 3.5
并开始处理。
线程 A 已完成消息 1.1
并开始处理 1.3
。
线程 C 已完成消息 3.5
和 returns。
线程 C 选择 1.6
,将其添加到线程 A 的队列中,然后 returns。
线程 C 现在空闲。
线程 B 已完成消息 2.2
并开始处理 2.4
。
线程 A 已完成消息 1.3
并开始处理 1.6
。
线程 B 已完成消息 2.4
和 returns。
线程 B 现在空闲。
线程 A 已完成消息 1.6
和 returns。
线程 A 现在空闲。
您想按顺序处理每个客户端的消息,同时您不想为每个客户端分配单独的线程。这是使用 Actor model 的确切用例。 Actor 就像轻量级线程。它们不像通常的线程那样强大,但非常适合像您这样的可重复任务。
如果你觉得 Google 找到的 java actor libraries
太重量级了,你可以使用我的 Github 库中的 most compact actor implementation,或者查看我的异步库中包含的扩展 actor 实现 df4j
.
我有一个客户端套接字列表,通常大小在 2000 左右。这些客户端是动态的,它们来来去去。
我有一个 ExecutorService
,它有一个由 32 个线程组成的固定线程池来处理这些线程。这个执行者服务负责解码和发送要发送给这2000个客户端的消息。
我想防止执行器服务的两个(或多个)线程同时处理同一个客户端。
一种方法是引入另一个簿记线程(所以我最终有 32 + 1 个线程)负责在上一条消息对应时调用 ExecutorService.submit(mesage)
对同一客户完成。但是我不确定这是否会引入瓶颈,也就是说这个新引入的簿记线程跟不上提交消息。
理想情况下,我不想预先将线程预分配给一组客户端,因为消息负载在客户端之间分布不均。也是事先不知道的。
这有什么方法?它们是由 java.util.concurrent
功能提供的吗?
更新
这是一个快速总结,因为评论指出存在一些误解:
我不希望每个客户端一个线程,因为我最终会得到 2000 个线程。
理想情况下,我不想将线程预分配给一组客户端,因为消息速率在所有客户端之间分布不均,而且事先不知道。
必须保留消息顺序。
我认为线程
A
正在等待线程B
是不好的,因为B
已经在向同一个客户端发送消息。换句话说,始终只有一个线程在处理一个客户端。
让每个线程为其自己的队列服务。给插座编号。将每个请求放入队列[socket num % num of threads].
这将确保来自特定套接字的请求按顺序处理。
很遗憾,您无法通过这种方式实现负载平衡。
或者,使用并发哈希映射来存储正在服务的套接字。如果线程为当前正在处理的套接字的请求提供服务,只需将请求放回队列中即可。
当线程 (A) 开始处理消息 (#1) 时,它需要向共享管理器对象注册客户端 ID。对于每个注册的客户,都有一个队列。
当另一个线程 (B) 开始为同一客户端处理消息 (#2) 时,注册将检测到线程 A 已经在处理,并将消息 #2 添加到客户端的队列中。然后线程 B 将停止并处理下一条消息。
当线程 A 处理完消息 #1 后,它会尝试取消注册,但由于消息 #2 是队列,线程 A 将开始处理该消息。之后,当它再次尝试取消注册时,没有排队的消息,线程将停止并处理下一条消息。
正确同步访问取决于管理器代码,因此第二条消息要么由线程 B 处理,要么交给线程 A,而不会丢失。
以上逻辑确保线程 B 不会等待线程 A,即没有空闲时间,并且消息 #2 会尽快处理,即以最小延迟处理,而不会为同一客户端处理两条消息同时。
每个客户端的消息顺序被保留。在全局范围内,当然不会保留消息顺序,因为消息#2 的处理被延迟了。
注意,每个线程只有一个队列,所以只有 32 个队列,并且只有 "duplicate" 条消息在队列中,所以所有队列通常都是空的。
更新
示例:为了便于识别,消息被命名为 clientId.messageId
,其中 messageId
是全局的。
消息按以下顺序提交给执行器(3 个线程):
1.1, 2.2, 1.3, 2.4, 3.5, 1.6
线程 A 启动
1.1
并开始处理。线程 B 获取
2.2
并开始处理。线程 C 选择
1.3
,将其添加到线程 A 的队列中,然后 returns.线程 C 选择
2.4
,将其添加到线程 B 的队列中,然后 returns.线程 C 获取
3.5
并开始处理。线程 A 已完成消息
1.1
并开始处理1.3
。线程 C 已完成消息
3.5
和 returns。线程 C 选择
1.6
,将其添加到线程 A 的队列中,然后 returns。
线程 C 现在空闲。线程 B 已完成消息
2.2
并开始处理2.4
。线程 A 已完成消息
1.3
并开始处理1.6
。线程 B 已完成消息
2.4
和 returns。
线程 B 现在空闲。线程 A 已完成消息
1.6
和 returns。
线程 A 现在空闲。
您想按顺序处理每个客户端的消息,同时您不想为每个客户端分配单独的线程。这是使用 Actor model 的确切用例。 Actor 就像轻量级线程。它们不像通常的线程那样强大,但非常适合像您这样的可重复任务。
如果你觉得 Google 找到的 java actor libraries
太重量级了,你可以使用我的 Github 库中的 most compact actor implementation,或者查看我的异步库中包含的扩展 actor 实现 df4j
.