Java 可调用线程:保持配置

Java callable threading: keep configuration

我正在设置一个服务器 (Radius) 的模拟器(用于测试),它使用线程将查询发送到另一个服务器 (LDAP)。 查询需要以每秒 x 的速度执行。 为此,我正在使用可调用的预定线程池执行程序,以便我可以创建可调用对象并将它们提交到线程池以供执行。 每个线程都应该打开自己的连接并使用它来查询。 问题是我希望连接在每次使用时都被同一个线程重新使用。

澄清一下:

如果我有一个 20 线程池,我希望创建和使用 20 个连接。 (所以我可以发送 10.000 个查询,这些查询将由 20 个 threads/connections 轮流处理)。

现在,要连接的 (LDAP) 服务器信息被发送到可调用对象的构造函数,可调用对象建立连接以供执行。此后,我使用未来的可调用系统检索结果。 这个问题是每次我创建一个可调用的连接被打开(当然后来关闭)。

我正在寻找使连接保持活动状态并为每个线程重新使用它们的最佳实践。

我想过一些方法来实现这个,但它们似乎不是很有效:

实现这个最有效的方法是什么? <- 旧问题,请参阅新问题的编辑。

编辑: 我在想因为我不能安全地获得一个线程号,但是 threadId 总是唯一的,我可以只使用

map<String/threadId, connection>

并将整个地图(参考)传递给可调用对象。这样我就可以使用类似的东西:(伪代码)

Connection con = map.get(this.getThreadId());
If (con == null){
  con = new Connection(...);
  map.put(this.getThreadId(), con)
}

也可以将地图设为静态,然后静态访问它。这样我就不必将地图传递给 Callable。 这至少是安全的,不会强迫我重构我的代码。

新问题: 什么会更符合最佳实践;上面的方案还是Zim-Zam的方案? 如果以上是最好的,去静态解决方案还是更好

我将使用在 Callables 之间共享的 BlockingQueue 来实现这一点,ScheduledThreadPoolExecutor 每秒将 x 查询放入 BlockingQueue

public class Worker implements Runnable {
    private final BlockingQueue<Query> inbox;
    private final BlockingQueue<Result> outbox;

    public Worker(BlockingQueue<Query> inbox, BlockingQueue<Result> outbox) {
        // create LDAP connection
        this.inbox = inbox;
        this.outbox = outbox;
    }

    public void run() {
        try {
            while(true) {
                // waits for a Query to be available
                Query query = inbox.take();
                // execute query
                outbox.add(new Result(/* result */));
            }
        } catch(InterruptedException e) {
          // log and restart? close LDAP connection and return?
        }
    }
}

public class Master {
   private final int x; // number of queries per second
   private final BlockingQueue<Query> outbox = new ArrayBlockingQueue<>(4 * x);
   private final BlockingQueue<Result> inbox = new ArrayBlockingQueue<>(4 * x);
   private final ScheduledThreadPoolExecutor executor;
   private final List<Future<?>> workers = new ArrayList<>(20);
   private final Future<?> receiver;

   public Master() {
     // initialize executor
     for(int i = 0; i < 20; i++) {
         Worker worker = new Worker(inbox, outbox);
         workers.add(executor.submit(worker));
     }

     receiver = executor.submit(new Runnable() {
         public void run() {
           while(!Thread.interrupted()) {
             try {
               Result result = inbox.take();
               // process result
             } catch(InterruptedException e) {
               return;
             }
           }
         }
     }
   }

   executor.scheduleWithFixedDelay(new Runnable() {
       public void run() {
           // add x queries to the queue
       }
   }, 0, 1, TimeUnit.SECONDS);
}

使用 BlockingQueue#add 将新的 Queries 添加到 outbox,如果这会引发异常,那么您的队列已满,您需要降低查询创建率 and/or 创造更多工人。要打破工作人员对其 Future 的无限循环调用 cancel(true),这将在 Worker.

内部抛出一个 InterruptedException