Java 具有可变输入和线程数的数组分区

Java array partitioning with variable input and thread count

我有一个整数数组作为输入,它的大小可以在 1 到 10^6 之间变化:

例如

int inputArr = new int[]{1, 2, 3, 4, 5};

我在 运行 时动态设置线程数,如下所示:

int threadCount = Runtime.getRuntime().availableProcessors() * 2;

线程数可以从 4 到 64 不等。这意味着,线程数可能甚至大于 inputArr 的大小,具体取决于平台。

由于 inputArr 也可以变得相当大,我想在小块中并行处理 inputArr

到目前为止,我已经成功创建了以下内容

public class Main {
    public static void main(String[] args) {
        int threadCount = Runtime.getRuntime().availableProcessors() * 2;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        final var inputArr = new int[]{1, 2, 4, 5};

        for (int i = 0; i < threadCount; i++) {
            executor.submit(new Worker(inputArr, i * threadCount, (i + 1) * threadCount));
        }

        executor.shutdown();
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static class Worker implements Runnable {
        private final int[] arr;
        private final int start;
        private final int end;

        public Worker(int[] arr, int startIndex, int endIndex) {
            this.arr = arr;
            this.start = startIndex;
            this.end = endIndex;
        }

        @Override
        public void run() {
            System.out.printf("Thread start: %d\n", Thread.currentThread().getId());
            for (int i = this.start; i < this.end; i++) {
                System.out.println(arr[i]);
            }
            System.out.printf("Thread end: %d\n", Thread.currentThread().getId());
        }
    }
}

输出类似于:

...
Thread start: 27
Thread start: 34
Thread start: 33
Thread start: 24
1
2
4
5
Thread start: 30
Thread start: 26
...

很明显,这里创建了很多线程。远远超过我这个例子中inputArr的大小。

但是,我希望输出看起来像这样:

...
Thread start: X
1
2
4
5
Thread end: X
...

我认为我的问题是数组的分区错误。我究竟做错了什么?有人可以帮我吗

您的分区看起来不正确,是的。

而不是

for (int i = 0; i < threadCount; i++) {
  executor.submit(new Worker(inputArr, i * threadCount, (i + 1) * threadCount));
}

试试这个

final var batchSize = inputArr.length / threadCount == 0 ? inputArr.length : inputArr.length / threadCount;
final var remainder = inputArr.length % threadCount;
final int partitionSize = threadCount > batchSize ? 1 : threadCount;
for (int i = 0; i < partitionSize; i++) {
    int endIndex = (i + 1) * batchSize;
    executor.submit(new Worker(inputArr, i * batchSize, endIndex > inputArr.length ? remainder : endIndex));
}