并行流只创建一个线程并像普通流一样快速给出结果

Parallel stream creates only one thread and gives result as fast as normal stream

下面是我尝试处理从并行流和普通流中的文件读取的行的代码。令人惊讶的是,并行流并没有比普通流有任何改进。我在这里错过了什么吗?

Files.walk(Paths.get(tweetFilePath + LocalDate.now())).forEach(
            filePath -> {
                if (Files.isRegularFile(filePath) && !filePath.toString().endsWith(".DS_Store")) {
                    long startTime = System.currentTimeMillis();
                    try {

                        Files.lines(filePath).parallel().forEach(line -> {
                                try {
                                    System.out.println(line);

                                } catch (Exception e) {
                                    System.out.println("Not able to crunch"+ e);
                                }

                        });
                    } catch (Exception e) {
                        System.out.println("Bad line in file ");
                    }finally {
                        System.out.println("total time required:" + (System.currentTimeMillis() - startTime));

                    }   
                }
            });

目前看来,Files.lines是线性读取文件,所以并行调用无法将源流拆分成子流进行并行处理。

details 见此处。相关部分引用如下:

What if my source is based on IO?

Currently, JDK IO-based Stream sources (for example BufferedReader.lines()) are mainly geared for sequential use, processing elements one-by-one as they arrive. Opportunities exist for supporting highly efficient bulk processing of buffered IO, but these currently require custom development of Stream sources, Spliterators, and/or Collectors. Some common forms may be supported in future JDK releases.

第一个问题是 Files.lines 并行化很糟糕,尤其是对于短于 1024 行的文件。查看 this 问题了解详情。如果您事先知道您的文件足够短,可以放入内存,那么最好先按顺序读取到 List

Files.readAllLines(filePath, StandardCharsets.UTF_8).parallelStream()...

我对如何 improve this 有一些想法,但它仍然不是理想的解决方案。事实上,如果您甚至无法估计输入流中的元素数量,Stream API 并行化是非常低效的。

第二个问题是你的forEach操作。这里只使用了System.out,所以所有的线程都会尝试写入同一个PrintStream,争夺同一个资源,这样大部分时间都花在等待锁释放上了。在内部它使用 BufferedWriter 同步所有写入。如果您不在并行操作中使用共享资源,您可能会受益于并行化。

顺便说一下,Files.linesBufferedReader 上创建了一个流。最好用 try-with-resources 语句来管理它。否则,仅当底层 FileInputStream 对象被垃圾回收时,文件才会关闭,因此您可能偶尔会遇到 "too many open files".

之类的错误