如何在 Java 中构建一个非常简单的多线程管道?
How to build a very simple multithreaded pipeline in Java?
我正在尝试 运行 在多线程管道中的矩阵上使用一些过滤器以节省一些时间。可悲的是,我还没有完全掌握明显的同步问题。
我期望的是我可以将一个矩阵输入我的 step() 函数并接收一个包含所有过滤器的矩阵(由管道长度延迟)。
volatile private short[][] stage1matrix, stage2matrix;
public short[][] step(short[][] matrix) {
short[][] res = stage2matrix; // stage matrix with all applied filters for output
stage2matrix = stage1matrix; // take matrix with applied stage 1 filters for stage 2
stage1matrix = matrix; // stage input matrix for stage 1 filters
Thread stage2 = new Thread(() -> {
applyArtifactFilter(stage2matrix);
});
stage2.setPriority(10);
stage2.start();
Thread stage1 = new Thread(() -> {
applySaltFilter(stage1matrix);
applyLineFilter(stage1matrix);
applyMedFilter(stage1matrix);
applyPreArtifactFilter(stage1matrix);
});
stage1.start();
try {
stage1.join();
stage2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
return res; // output matrix with all applied filters
}
}
此外,对于一些额外的(可能是不必要的)信息,我在 applyMedFilter 中以一种可能滥用的方式使用了并行性(我真的不认为这会导致问题,但是......):
ArrayList<Integer> rowIndices = new ArrayList<Integer>();
for (int i = kernel; i < matrix.length + kernel; i++) {
rowIndices.add(i);
}
// Move window through all elements of the image
rowIndices.parallelStream().forEach(i -> {
for (int j = kernel; j < matrix[0].length + kernel; j++) {
// do some rowwise independent work in parallel
doSomeThingWith(matrix[i][j]);
}
});
}
我不规则地从仅应用了一些过滤器的函数中获取矩阵,这是怎么回事?尤其是 applyMedFilter 有时没有应用,这是不可能发生的。有时,一切正常,有时却不正常,这一事实使我得出结论,这可能是缓存一致性问题。
另外,注释掉 applyArtifactFilter 似乎可以解决问题。
为什么这没有像我预期的那样工作,我怎样才能做得更好?
贴出的代码可以正常运行;错误是函数的输入每次都是相同的矩阵而不是副本,所以所有变量都引用相同的。
我正在尝试 运行 在多线程管道中的矩阵上使用一些过滤器以节省一些时间。可悲的是,我还没有完全掌握明显的同步问题。
我期望的是我可以将一个矩阵输入我的 step() 函数并接收一个包含所有过滤器的矩阵(由管道长度延迟)。
volatile private short[][] stage1matrix, stage2matrix;
public short[][] step(short[][] matrix) {
short[][] res = stage2matrix; // stage matrix with all applied filters for output
stage2matrix = stage1matrix; // take matrix with applied stage 1 filters for stage 2
stage1matrix = matrix; // stage input matrix for stage 1 filters
Thread stage2 = new Thread(() -> {
applyArtifactFilter(stage2matrix);
});
stage2.setPriority(10);
stage2.start();
Thread stage1 = new Thread(() -> {
applySaltFilter(stage1matrix);
applyLineFilter(stage1matrix);
applyMedFilter(stage1matrix);
applyPreArtifactFilter(stage1matrix);
});
stage1.start();
try {
stage1.join();
stage2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
return res; // output matrix with all applied filters
}
}
此外,对于一些额外的(可能是不必要的)信息,我在 applyMedFilter 中以一种可能滥用的方式使用了并行性(我真的不认为这会导致问题,但是......):
ArrayList<Integer> rowIndices = new ArrayList<Integer>();
for (int i = kernel; i < matrix.length + kernel; i++) {
rowIndices.add(i);
}
// Move window through all elements of the image
rowIndices.parallelStream().forEach(i -> {
for (int j = kernel; j < matrix[0].length + kernel; j++) {
// do some rowwise independent work in parallel
doSomeThingWith(matrix[i][j]);
}
});
}
我不规则地从仅应用了一些过滤器的函数中获取矩阵,这是怎么回事?尤其是 applyMedFilter 有时没有应用,这是不可能发生的。有时,一切正常,有时却不正常,这一事实使我得出结论,这可能是缓存一致性问题。 另外,注释掉 applyArtifactFilter 似乎可以解决问题。
为什么这没有像我预期的那样工作,我怎样才能做得更好?
贴出的代码可以正常运行;错误是函数的输入每次都是相同的矩阵而不是副本,所以所有变量都引用相同的。