ForkJoinPool BufferedImage 处理方式
ForkJoinPool BufferedImage Processing Style
我正在尝试使用 java 中的 ForkJoinPool 处理图像。我使用流对图像进行一些自定义操作。我正在尝试将 ForkJoinPool 用于 getRGB
和 setRGB
方法。如何在 getRGB
方法上实现并行性?
@Override
public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) {
int[][] sol = new int[h][w];
int threshold = w;
class RecursiveSetter extends RecursiveAction {
int from;
int to;
FJBufferedImage image;
RecursiveSetter(int from, int to, FJBufferedImage image) {
this.from = from;
this.to = to;
this.image = image;
}
@Override
protected void compute() {
System.out.println("From : " + from + " To : " + to);
if (from >= to) return;
if (to - from == 1) {
computeDirectly(from);
return;
} else {
int mid = from + (to - from) / 2;
System.out.println("From : " + from + " To : " + to +
"Mid :" + mid);
invokeAll(
new RecursiveSetter(from, mid, image),
new RecursiveSetter(mid + 1, to, image));
return;
}
}
void computeDirectly(int row) {
sol[from] = image.getRealRGB(from, 0, w, 1, null, offset,
scansize);
}
}
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.invoke(new RecursiveSetter(0, h-1, this));
return Arrays.stream(sol)
.flatMapToInt(Arrays::stream)
.toArray();
}
getRealRGB
只是代理了BufferedImage
的方法。我知道这可能不切实际,但我只想知道如何在这种情况下使用 ForkJoinPool。是的,上面的代码抛出 ArrayIndexOutOfBound
异常。请就如何拆分工作负载(行、列、小网格。现在,我正在按行拆分)以及如何确定阈值提出建议。
首先对您的尝试发表一些评论:
int[][] sol = new int[h][w];
这里您正在创建一个二维数组,它在 Java 中是一个元素类型为 int[]
的一维数组,该数组已经填充了 int[]
类型的子数组。由于您要用 sol[from] = /* something returning an int[] array */
覆盖元素,因此分配这些子数组已过时。所以在这种情况下,你应该使用
int[][] sol = new int[h][];
相反。但是认识到外部数组的一维性质也允许认识到一个简单的流解决方案就可以完成这项工作,即
int[][] sol = IntStream.range(yStart, yStart+h)
.parallel()
.mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize))
.toArray(int[][]::new);
这已经完成了在可用内核上分配工作负载的工作。它在幕后使用 Fork/Join 框架,就像您尝试做的那样,但这是一个实现细节。您可以将其与下一个流操作融合,例如
return IntStream.range(yStart, yStart+h)
.parallel()
.flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize)))
.toArray();
虽然,如果我正确理解方法签名,你实际上想要做
public int[] getRGB(
int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) {
final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize];
IntStream.range(yStart, yStart+h).parallel()
.forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize));
return result;
}
履行合同。这也最大限度地减少了复制操作的次数。由于每个查询写入数组的不同区域,直接写入目标数组是线程安全的。
这保留了仅拆分行范围的策略。行的子拆分是可能的,但更复杂,而且很少有回报。它只会在调用者请求的行数很少但每行有很多值的极端情况下有所帮助。但即便如此,由于内存局部性问题,尚不清楚复杂的子行拆分是否会得到回报。
关于你原来的问题,如果你直接实现一个ForkJoinTask
,你可以用getSurplusQueuedTaskCount()
来决定是重新拆分还是直接计算。
阈值的选择是由于必须同步的任务对象的数量和核心利用率之间的开销之间的权衡。如果工作负载可以完美平衡地分配,并且没有其他不相关的线程或进程使用 CPU 时间,那么每个核心有一个项目将是完美的。但在实践中,这些任务永远不会 运行 完全相同,因此最好让那些先完成的核心执行一些备用的拆分任务。典型的阈值介于 1 或 3 之间(请记住,这是每个核心排队任务的数量 ),对于您的任务类型,具有非常均匀的工作负载,可以使用较小的数量,例如一旦有另一个队列项就停止拆分。
我正在尝试使用 java 中的 ForkJoinPool 处理图像。我使用流对图像进行一些自定义操作。我正在尝试将 ForkJoinPool 用于 getRGB
和 setRGB
方法。如何在 getRGB
方法上实现并行性?
@Override
public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) {
int[][] sol = new int[h][w];
int threshold = w;
class RecursiveSetter extends RecursiveAction {
int from;
int to;
FJBufferedImage image;
RecursiveSetter(int from, int to, FJBufferedImage image) {
this.from = from;
this.to = to;
this.image = image;
}
@Override
protected void compute() {
System.out.println("From : " + from + " To : " + to);
if (from >= to) return;
if (to - from == 1) {
computeDirectly(from);
return;
} else {
int mid = from + (to - from) / 2;
System.out.println("From : " + from + " To : " + to +
"Mid :" + mid);
invokeAll(
new RecursiveSetter(from, mid, image),
new RecursiveSetter(mid + 1, to, image));
return;
}
}
void computeDirectly(int row) {
sol[from] = image.getRealRGB(from, 0, w, 1, null, offset,
scansize);
}
}
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.invoke(new RecursiveSetter(0, h-1, this));
return Arrays.stream(sol)
.flatMapToInt(Arrays::stream)
.toArray();
}
getRealRGB
只是代理了BufferedImage
的方法。我知道这可能不切实际,但我只想知道如何在这种情况下使用 ForkJoinPool。是的,上面的代码抛出 ArrayIndexOutOfBound
异常。请就如何拆分工作负载(行、列、小网格。现在,我正在按行拆分)以及如何确定阈值提出建议。
首先对您的尝试发表一些评论:
int[][] sol = new int[h][w];
这里您正在创建一个二维数组,它在 Java 中是一个元素类型为 int[]
的一维数组,该数组已经填充了 int[]
类型的子数组。由于您要用 sol[from] = /* something returning an int[] array */
覆盖元素,因此分配这些子数组已过时。所以在这种情况下,你应该使用
int[][] sol = new int[h][];
相反。但是认识到外部数组的一维性质也允许认识到一个简单的流解决方案就可以完成这项工作,即
int[][] sol = IntStream.range(yStart, yStart+h)
.parallel()
.mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize))
.toArray(int[][]::new);
这已经完成了在可用内核上分配工作负载的工作。它在幕后使用 Fork/Join 框架,就像您尝试做的那样,但这是一个实现细节。您可以将其与下一个流操作融合,例如
return IntStream.range(yStart, yStart+h)
.parallel()
.flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize)))
.toArray();
虽然,如果我正确理解方法签名,你实际上想要做
public int[] getRGB(
int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) {
final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize];
IntStream.range(yStart, yStart+h).parallel()
.forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize));
return result;
}
履行合同。这也最大限度地减少了复制操作的次数。由于每个查询写入数组的不同区域,直接写入目标数组是线程安全的。
这保留了仅拆分行范围的策略。行的子拆分是可能的,但更复杂,而且很少有回报。它只会在调用者请求的行数很少但每行有很多值的极端情况下有所帮助。但即便如此,由于内存局部性问题,尚不清楚复杂的子行拆分是否会得到回报。
关于你原来的问题,如果你直接实现一个ForkJoinTask
,你可以用getSurplusQueuedTaskCount()
来决定是重新拆分还是直接计算。
阈值的选择是由于必须同步的任务对象的数量和核心利用率之间的开销之间的权衡。如果工作负载可以完美平衡地分配,并且没有其他不相关的线程或进程使用 CPU 时间,那么每个核心有一个项目将是完美的。但在实践中,这些任务永远不会 运行 完全相同,因此最好让那些先完成的核心执行一些备用的拆分任务。典型的阈值介于 1 或 3 之间(请记住,这是每个核心排队任务的数量 ),对于您的任务类型,具有非常均匀的工作负载,可以使用较小的数量,例如一旦有另一个队列项就停止拆分。