从 Java 并行流产生其他并行流并且很少失败

From Java parallelstream spawns other parallelStreams and fails seldom

考虑以下功能:

public void execute4() {
        File filePath = new File(filePathData);
        File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
        List<CDR> cdrs = new ArrayList<CDR>();
        Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
        cdrs.sort(cdrsorter);
    }

读取包含 CDR 的文件列表并执行 readCDRP(),即:

private void readCDRP(List<CDR> cdrs, File file) {
    final CDR cdr = new CDR(file.getName());
    try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
        List<String> lines = bfr.lines().collect(Collectors.toList());
        lines.parallelStream().forEach(e -> {
            String[] data = e.split(",", -1);
            CDREntry entry = new CDREntry(file.getName());
            for (int i = 0; i < data.length; i++) {
                entry.setField(i, data[i]);
            }
            cdr.addEntry(entry);
        });

        if (cdr != null) {
            cdrs.add(cdr);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

我观察到的是,偶尔而不是一直,我要么在 readCDRP 函数中得到一个 ArrayIndexNotBound 异常(这很尴尬,因为 cdr 的列表是一个 ArrayList() ):

cdr.addEntry(entry);

或 在我应用排序的 execute4() 的最后一行。

我认为问题是来自 execute4 的第一个 parallelStream 在内存中与 readCDRP() 中的第二个 parallelStream 执行不在单独的 space 中,并且似乎也错误地共享了数据。使用 "seem" 这个词,因为我无法确认,只是一个 hutch。

问题是: 从 JDK8 的角度来看,我的代码有问题吗? 是否有使用相同流程的解决方法,例如使用 CountDownLatch? ForkJoinPool 有限制吗?

感谢您的回复....

编辑(1): addEntry 是 class 本身的一部分:

class CDR {
        public final String fileName;
        private final List<CDREntry> entries = new ArrayList<CDREntry>();

        public CDR(String fileName) {
            super();
            this.fileName = fileName;
        }

        public List<CDREntry> getEntries() {
            return entries;
        }

        public List<CDREntry> addEntry(CDREntry e) {
            entries.add(e);
            return entries;
        }

        public String getFileName() {
            return this.fileName;
        }
    }

从线程安全的角度来看,您的代码是错误的。在 readCDR 中,您将元素添加到 cdrs 列表中,这是一个 ArrayList 不支持并发写入的列表。这就是它崩溃的原因。

更好的方法是让 readCDR return 一个 cdr 对象并执行如下操作:

 List<CDR> cdrs = Arrays.stream(files)
                        .parallel()
                        .map(this::readCDR)
                        .collect(Collectors.toList());

此外,对 IO 相关操作使用并行流通常不是一个好主意,但那是另一个讨论。

您正在使用并行流和具有副作用的 lambda (lambda 更新 ArrayList 'cdrs') 尝试使用收集器或归约操作。

当您开始以函数式风格编程时,您应该更喜欢不可变对象,它们可以通过构造(或者可能使用构建器模式或某些工厂方法)完全创建。所以你的 CDREntry class 可能看起来像这样:

class CDREntry {
    private final String[] fields;
    private final String name;

    public CDREntry(String name, String[] fields) {
        this.name = name;
        this.fields = fields;
    }
    // Add getters and whatever
}

你的 CDR class 可能看起来像这样:

class CDR {
    private final String fileName;
    private final List<CDREntry> entries;

    public CDR(String fileName, List<CDREntry> entries) {
        this.fileName = fileName;
        this.entries = entries;
    }

    public List<CDREntry> getEntries() {
        return entries;
    }

    public String getFileName() {
        return this.fileName;
    }
}

有了这样的 classes 事情变得容易了。其余代码可以这样重写:

public void execute4() {
    File filePath = new File(filePathData);
    File[] files = filePath.listFiles((File data, String name) -> 
             data.getName().endsWith("CDR")); // fixed this line: it had compilation error
    List<CDR> cdrs = Arrays.stream(files).parallel()
            .map(this::readCDRP).sorted(cdrsorter)
            .collect(Collectors.toList());
}

private CDR readCDRP(File file) {
    try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
        // I'm not sure that collecting lines into list 
        // before main processing was actually necessary
        return bfr.lines().parallelStream()
                .map(e -> new CDREntry(file.getName(), e.split(",", -1)))
                .collect(Collectors.collectingAndThen(
                        Collectors.toList(), list -> new CDR(file.getName(), list)));
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

总的来说,请记住 forEach 通常不是解决任务的最干净的方法。当您将流集成到遗留代码中时,这可能会有所帮助,但通常应该避免。