固定和动态调整工作线程的数量

Fixing and dynamically adjusting the number of worker-threads

我目前正在实现一个具有 addRecord(最终 Redcord 记录)和 close() 方法的数据容器/数据结构。

public class Container{

    Container() {
    }

    public void addRecord(final Record record){
        // do something with the record
        // add the modified it to the container
    }

    public void close(){

    }     
}

由于要将记录存储到容器中需要完成几个步骤,因此我想在最终将它们添加到容器之前将这些步骤外包给线程。

我的问题是我不希望任何容器实例使用超过 4 个线程,如果我同时打开多个容器实例,总共不超过,例如,12 个线程应该是用过的。除此之外,我希望在创建第 4 个容器后,其他 3 个打开的容器中的每一个都应该失去一个线程,这样所有 4 个容器都可以使用 3 个线程。

我正在研究 ThreadPools 和 ExecutorServices。虽然设置大小为 12 的线程池很简单,但在使用 ExecutorServices 时很难将使用的线程数限制为 4。回想一下,我最多需要 12 个线程,这就是为什么我有一个在容器实例之间共享的大小为 12 的静态实例。

这就是我的起点

public class Container{

    public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);

    final List<Future<Entry>> _fRecords = new LinkedList<>();

    Container() {
    }

    public void addRecord(final Record record){
        _fRecords.add(SERVICE.submit(new Callable<Entry>{

            @Override
            public Entry call() throws Exception {
                return record.createEntry();
            }
        }));
    }

    public void close() throws Exception {
        for(final Future<Entry> f) {
            f.get(); // and add the entry to the container           
        }
    }     
}

显然,这不能确保单个实例最多使用 4 个线程。此外,如果另一个容器需要几个线程,我看不出如何强制这些 Executorservices 关闭工作人员或重新分配他们。

问题:

  1. 由于Redord本身是一个接口,createEntry()的运行时间取决于实际实现,所以我要保证不同的容器不会使用相同的threads/workers,也就是说,我不希望任何线程为两个不同的容器工作。这背后的想法是,我不希望仅处理“长 运行”记录的容器实例被仅处理“短 运行 记录”的容器减慢速度。如果这在概念上没有意义(向您提问),则无需为不同的容器隔离 threads/workers。

  2. 实现我自己的 ExecutorService 是正确的方法吗?有人可以指出一个具有类似 problem/solution 的项目吗?否则我想我必须实现自己的 ExecutorService 并在我的容器中共享它,或者有更好的解决方案吗?

  3. 目前,我正在 close 方法中收集所有期货,因为容器必须按照它们到达的顺序存储 records/entries。显然,这需要很多时间,因此我想在 Future 完成后执行此操作。让一个额外的 worker 单独处理 futures 是否有意义,或者让我的 worker 做这项工作更好,即进行线程交互。

我不认为 Java 运行时提供了开箱即用的东西。

我自己编写了 class 来满足此类需求(公共池 + 给定队列的限制)。

public static class ThreadLimitedQueue {
    private final ExecutorService executorService;
    private final int limit;

    private final Object lock = new Object();
    private final LinkedList<Runnable> pending = new LinkedList<>();
    private int inProgress = 0;

    public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
        this.executorService = executorService;
        this.limit = limit;
    }

    public void submit(Runnable runnable) {
        final Runnable wrapped = () -> {
            try {
                runnable.run();
            } finally {
                onComplete();
            }
        };
        synchronized (lock) {
            if (inProgress < limit) {
                inProgress++;
                executorService.submit(wrapped);
            } else {
                pending.add(wrapped);
            }
        }
    }

    private void onComplete() {
        synchronized (lock) {
            final Runnable pending = this.pending.poll();
            if (pending == null || inProgress > limit) {
                inProgress--;
            } else {
                executorService.submit(pending);
            }
        }
    }
}

在我的案例中唯一的区别 limit 是不变的,但您可以修改它。例如,您可以将 int limit 替换为 Supplier<Integer> limitFunction 并且此功能必须提供动态限制,例如

Supplier<Integer> limitFunction = 12 / containersList.size();

只是让它更健壮(例如,如果 containersList 为空或超过 12,该怎么办)