Java 具有可变输入和线程数的数组分区
Java array partitioning with variable input and thread count
我有一个整数数组作为输入,它的大小可以在 1 到 10^6 之间变化:
例如
int inputArr = new int[]{1, 2, 3, 4, 5};
我在 运行 时动态设置线程数,如下所示:
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
线程数可以从 4 到 64 不等。这意味着,线程数可能甚至大于 inputArr
的大小,具体取决于平台。
由于 inputArr
也可以变得相当大,我想在小块中并行处理 inputArr
。
到目前为止,我已经成功创建了以下内容
public class Main {
public static void main(String[] args) {
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final var inputArr = new int[]{1, 2, 4, 5};
for (int i = 0; i < threadCount; i++) {
executor.submit(new Worker(inputArr, i * threadCount, (i + 1) * threadCount));
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class Worker implements Runnable {
private final int[] arr;
private final int start;
private final int end;
public Worker(int[] arr, int startIndex, int endIndex) {
this.arr = arr;
this.start = startIndex;
this.end = endIndex;
}
@Override
public void run() {
System.out.printf("Thread start: %d\n", Thread.currentThread().getId());
for (int i = this.start; i < this.end; i++) {
System.out.println(arr[i]);
}
System.out.printf("Thread end: %d\n", Thread.currentThread().getId());
}
}
}
输出类似于:
...
Thread start: 27
Thread start: 34
Thread start: 33
Thread start: 24
1
2
4
5
Thread start: 30
Thread start: 26
...
很明显,这里创建了很多线程。远远超过我这个例子中inputArr
的大小。
但是,我希望输出看起来像这样:
...
Thread start: X
1
2
4
5
Thread end: X
...
我认为我的问题是数组的分区错误。我究竟做错了什么?有人可以帮我吗
您的分区看起来不正确,是的。
而不是
for (int i = 0; i < threadCount; i++) {
executor.submit(new Worker(inputArr, i * threadCount, (i + 1) * threadCount));
}
试试这个
final var batchSize = inputArr.length / threadCount == 0 ? inputArr.length : inputArr.length / threadCount;
final var remainder = inputArr.length % threadCount;
final int partitionSize = threadCount > batchSize ? 1 : threadCount;
for (int i = 0; i < partitionSize; i++) {
int endIndex = (i + 1) * batchSize;
executor.submit(new Worker(inputArr, i * batchSize, endIndex > inputArr.length ? remainder : endIndex));
}
我有一个整数数组作为输入,它的大小可以在 1 到 10^6 之间变化:
例如
int inputArr = new int[]{1, 2, 3, 4, 5};
我在 运行 时动态设置线程数,如下所示:
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
线程数可以从 4 到 64 不等。这意味着,线程数可能甚至大于 inputArr
的大小,具体取决于平台。
由于 inputArr
也可以变得相当大,我想在小块中并行处理 inputArr
。
到目前为止,我已经成功创建了以下内容
public class Main {
public static void main(String[] args) {
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final var inputArr = new int[]{1, 2, 4, 5};
for (int i = 0; i < threadCount; i++) {
executor.submit(new Worker(inputArr, i * threadCount, (i + 1) * threadCount));
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class Worker implements Runnable {
private final int[] arr;
private final int start;
private final int end;
public Worker(int[] arr, int startIndex, int endIndex) {
this.arr = arr;
this.start = startIndex;
this.end = endIndex;
}
@Override
public void run() {
System.out.printf("Thread start: %d\n", Thread.currentThread().getId());
for (int i = this.start; i < this.end; i++) {
System.out.println(arr[i]);
}
System.out.printf("Thread end: %d\n", Thread.currentThread().getId());
}
}
}
输出类似于:
...
Thread start: 27
Thread start: 34
Thread start: 33
Thread start: 24
1
2
4
5
Thread start: 30
Thread start: 26
...
很明显,这里创建了很多线程。远远超过我这个例子中inputArr
的大小。
但是,我希望输出看起来像这样:
...
Thread start: X
1
2
4
5
Thread end: X
...
我认为我的问题是数组的分区错误。我究竟做错了什么?有人可以帮我吗
您的分区看起来不正确,是的。
而不是
for (int i = 0; i < threadCount; i++) {
executor.submit(new Worker(inputArr, i * threadCount, (i + 1) * threadCount));
}
试试这个
final var batchSize = inputArr.length / threadCount == 0 ? inputArr.length : inputArr.length / threadCount;
final var remainder = inputArr.length % threadCount;
final int partitionSize = threadCount > batchSize ? 1 : threadCount;
for (int i = 0; i < partitionSize; i++) {
int endIndex = (i + 1) * batchSize;
executor.submit(new Worker(inputArr, i * batchSize, endIndex > inputArr.length ? remainder : endIndex));
}