具有大小受限资源池的多线程 Java 工作者

Multithreaded Java worker with a size restricted resource pool

我有这个 'Worker' class,它使用资源 'Client'。 在任何给定时间可能有任意数量的线程,运行 'Worker'。 'Client' 不是线程安全的,因此我为它使用 'ThreadLocal'。 'Client' 连接到某个服务器并执行工作人员提供给 'Client' 的 HTTP 'Request'。

public class Worker {
// Client is NOT thread-safe !!! 
private static ThreadLocal<Client> client = new ThreadLocal<Client>();

@Override
protected void onGet(Request req) {
    handleRequest(req);
}

private void handleRequest(Request req) {
    someRunnableExecutor(new Runnable() {
        @Override
        public void run() {
            get_client().send_req(req);
        }
    });
}

private Client get_client() {
    Client c = client.get();
    if (c == null) {
       c = new Client();
       client.set(c);
    }
    return c;
}

在当前的实现(上图)中,为了清晰起见,"active" 'Clients' 与 运行 'Workers'.[=13= 一样多]

这是一个问题,因为服务器即将耗尽。

我能做的就是修复 'Worker'。无法访问 'Client'、服务器或运行 worker 的执行程序。

我想要做的是在 'Worker' 中有一个 'Client'(s) 的队列和一段同步代码,它从 'Client' Queue,如果 Queue 为空,'Worker' 应该等到 Queue 中有一个供他使用。然后将 'Client' 放回队列 - 同样同步。

我真的想让它尽可能简单,尽可能减少对代码的更改。

没有新的 classes,没有工厂,只有一些数据结构来保存 'Client'(s) 和同步。

我对如何实现这一点感到有点困惑,而且 'Client' 不是线程安全的,我必须 'ThreadLocal'(ize) 它。这是我如何将其放入队列中的方法吗?

private static Queue<ThreadLocal<CLient>> queue = 
      new LinkedList<ThreadLocal<CLient>>();

此外,how/where 我是否初始化该队列一次,比如 5 个客户端?

请分享您的想法。

这里不需要 ThreadLocal,因为您希望 Clients 少于 Workers。 BlockingQueue 中您需要的一切。 注意!我认为客户端的 send_req 是同步的,如果不是的话 - 代码需要对 运行() 方法

进行一些更改
public class Worker {

    private static final int CLIENTS_NUMBER = 5;
    private static final BlockingQueue<Client> queue = new LinkedBlockingQueue<>(CLIENTS_NUMBER);

    static {
        for (int i = 0; i < CLIENTS_NUMBER; i++)
            queue.put(new Client());
    }

    @Override
    protected void onGet(Request req) {
        handleRequest(req);
    }

    private void handleRequest(Request req) {
        someRunnableExecutor(new Runnable() {
            @Override
            public void run() {
                try {
                    Client client = takeClient();
                    client.send_req(req);
                    putClient(client);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
    }

    private Client takeClient() throws InterruptedException {
        return queue.take();
    }

    private void putClient(Client client) throws InterruptedException {
        queue.put(client);
    }
}