具有大小受限资源池的多线程 Java 工作者
Multithreaded Java worker with a size restricted resource pool
我有这个 'Worker' class,它使用资源 'Client'。
在任何给定时间可能有任意数量的线程,运行 'Worker'。
'Client' 不是线程安全的,因此我为它使用 'ThreadLocal'。
'Client' 连接到某个服务器并执行工作人员提供给 'Client' 的 HTTP 'Request'。
public class Worker {
// Client is NOT thread-safe !!!
private static ThreadLocal<Client> client = new ThreadLocal<Client>();
@Override
protected void onGet(Request req) {
handleRequest(req);
}
private void handleRequest(Request req) {
someRunnableExecutor(new Runnable() {
@Override
public void run() {
get_client().send_req(req);
}
});
}
private Client get_client() {
Client c = client.get();
if (c == null) {
c = new Client();
client.set(c);
}
return c;
}
在当前的实现(上图)中,为了清晰起见,"active" 'Clients' 与 运行 'Workers'.[=13= 一样多]
这是一个问题,因为服务器即将耗尽。
我能做的就是修复 'Worker'。无法访问 'Client'、服务器或运行 worker 的执行程序。
我想要做的是在 'Worker' 中有一个 'Client'(s) 的队列和一段同步代码,它从 'Client' Queue,如果 Queue 为空,'Worker' 应该等到 Queue 中有一个供他使用。然后将 'Client' 放回队列 - 同样同步。
我真的想让它尽可能简单,尽可能减少对代码的更改。
没有新的 classes,没有工厂,只有一些数据结构来保存 'Client'(s) 和同步。
我对如何实现这一点感到有点困惑,而且 'Client' 不是线程安全的,我必须 'ThreadLocal'(ize) 它。这是我如何将其放入队列中的方法吗?
private static Queue<ThreadLocal<CLient>> queue =
new LinkedList<ThreadLocal<CLient>>();
此外,how/where 我是否初始化该队列一次,比如 5 个客户端?
请分享您的想法。
这里不需要 ThreadLocal,因为您希望 Clients 少于 Workers。 BlockingQueue 中您需要的一切。
注意!我认为客户端的 send_req 是同步的,如果不是的话 - 代码需要对 运行() 方法
进行一些更改
public class Worker {
private static final int CLIENTS_NUMBER = 5;
private static final BlockingQueue<Client> queue = new LinkedBlockingQueue<>(CLIENTS_NUMBER);
static {
for (int i = 0; i < CLIENTS_NUMBER; i++)
queue.put(new Client());
}
@Override
protected void onGet(Request req) {
handleRequest(req);
}
private void handleRequest(Request req) {
someRunnableExecutor(new Runnable() {
@Override
public void run() {
try {
Client client = takeClient();
client.send_req(req);
putClient(client);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
private Client takeClient() throws InterruptedException {
return queue.take();
}
private void putClient(Client client) throws InterruptedException {
queue.put(client);
}
}
我有这个 'Worker' class,它使用资源 'Client'。 在任何给定时间可能有任意数量的线程,运行 'Worker'。 'Client' 不是线程安全的,因此我为它使用 'ThreadLocal'。 'Client' 连接到某个服务器并执行工作人员提供给 'Client' 的 HTTP 'Request'。
public class Worker {
// Client is NOT thread-safe !!!
private static ThreadLocal<Client> client = new ThreadLocal<Client>();
@Override
protected void onGet(Request req) {
handleRequest(req);
}
private void handleRequest(Request req) {
someRunnableExecutor(new Runnable() {
@Override
public void run() {
get_client().send_req(req);
}
});
}
private Client get_client() {
Client c = client.get();
if (c == null) {
c = new Client();
client.set(c);
}
return c;
}
在当前的实现(上图)中,为了清晰起见,"active" 'Clients' 与 运行 'Workers'.[=13= 一样多]
这是一个问题,因为服务器即将耗尽。
我能做的就是修复 'Worker'。无法访问 'Client'、服务器或运行 worker 的执行程序。
我想要做的是在 'Worker' 中有一个 'Client'(s) 的队列和一段同步代码,它从 'Client' Queue,如果 Queue 为空,'Worker' 应该等到 Queue 中有一个供他使用。然后将 'Client' 放回队列 - 同样同步。
我真的想让它尽可能简单,尽可能减少对代码的更改。
没有新的 classes,没有工厂,只有一些数据结构来保存 'Client'(s) 和同步。
我对如何实现这一点感到有点困惑,而且 'Client' 不是线程安全的,我必须 'ThreadLocal'(ize) 它。这是我如何将其放入队列中的方法吗?
private static Queue<ThreadLocal<CLient>> queue =
new LinkedList<ThreadLocal<CLient>>();
此外,how/where 我是否初始化该队列一次,比如 5 个客户端?
请分享您的想法。
这里不需要 ThreadLocal,因为您希望 Clients 少于 Workers。 BlockingQueue 中您需要的一切。 注意!我认为客户端的 send_req 是同步的,如果不是的话 - 代码需要对 运行() 方法
进行一些更改public class Worker {
private static final int CLIENTS_NUMBER = 5;
private static final BlockingQueue<Client> queue = new LinkedBlockingQueue<>(CLIENTS_NUMBER);
static {
for (int i = 0; i < CLIENTS_NUMBER; i++)
queue.put(new Client());
}
@Override
protected void onGet(Request req) {
handleRequest(req);
}
private void handleRequest(Request req) {
someRunnableExecutor(new Runnable() {
@Override
public void run() {
try {
Client client = takeClient();
client.send_req(req);
putClient(client);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
private Client takeClient() throws InterruptedException {
return queue.take();
}
private void putClient(Client client) throws InterruptedException {
queue.put(client);
}
}