如何在 Java 中构建一个非常简单的多线程管道?

How to build a very simple multithreaded pipeline in Java?

我正在尝试 运行 在多线程管道中的矩阵上使用一些过滤器以节省一些时间。可悲的是,我还没有完全掌握明显的同步问题。

我期望的是我可以将一个矩阵输入我的 step() 函数并接收一个包含所有过滤器的矩阵(由管道长度延迟)。

    volatile private short[][] stage1matrix, stage2matrix;

    public short[][] step(short[][] matrix) {

        short[][] res = stage2matrix; // stage matrix with all applied filters for output
        stage2matrix = stage1matrix; // take matrix with applied stage 1 filters for stage 2
        stage1matrix = matrix; // stage input matrix for stage 1 filters

        Thread stage2 = new Thread(() -> {
            applyArtifactFilter(stage2matrix);
        });
        stage2.setPriority(10);
        stage2.start();

        Thread stage1 = new Thread(() -> {
            applySaltFilter(stage1matrix);
            applyLineFilter(stage1matrix);
            applyMedFilter(stage1matrix);
            applyPreArtifactFilter(stage1matrix);
        });
        stage1.start();

        try {
            stage1.join();
            stage2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return res; // output matrix with all applied filters
    }
}

此外,对于一些额外的(可能是不必要的)信息,我在 applyMedFilter 中以一种可能滥用的方式使用了并行性(我真的不认为这会导致问题,但是......):

        ArrayList<Integer> rowIndices = new ArrayList<Integer>();
        for (int i = kernel; i < matrix.length + kernel; i++) {
            rowIndices.add(i);
        }
        //   Move window through all elements of the image
        rowIndices.parallelStream().forEach(i -> {
            for (int j = kernel; j < matrix[0].length + kernel; j++) {
                // do some rowwise independent work in parallel
                   doSomeThingWith(matrix[i][j]);
            }
        });
    }

我不规则地从仅应用了一些过滤器的函数中获取矩阵,这是怎么回事?尤其是 applyMedFilter 有时没有应用,这是不可能发生的。有时,一切正常,有时却不正常,这一事实使我得出结论,这可能是缓存一致性问题。 另外,注释掉 applyArtifactFilter 似乎可以解决问题。

为什么这没有像我预期的那样工作,我怎样才能做得更好?

贴出的代码可以正常运行;错误是函数的输入每次都是相同的矩阵而不是副本,所以所有变量都引用相同的。