并行流只创建一个线程并像普通流一样快速给出结果
Parallel stream creates only one thread and gives result as fast as normal stream
下面是我尝试处理从并行流和普通流中的文件读取的行的代码。令人惊讶的是,并行流并没有比普通流有任何改进。我在这里错过了什么吗?
Files.walk(Paths.get(tweetFilePath + LocalDate.now())).forEach(
filePath -> {
if (Files.isRegularFile(filePath) && !filePath.toString().endsWith(".DS_Store")) {
long startTime = System.currentTimeMillis();
try {
Files.lines(filePath).parallel().forEach(line -> {
try {
System.out.println(line);
} catch (Exception e) {
System.out.println("Not able to crunch"+ e);
}
});
} catch (Exception e) {
System.out.println("Bad line in file ");
}finally {
System.out.println("total time required:" + (System.currentTimeMillis() - startTime));
}
}
});
目前看来,Files.lines
是线性读取文件,所以并行调用无法将源流拆分成子流进行并行处理。
details 见此处。相关部分引用如下:
What if my source is based on IO?
Currently, JDK IO-based Stream sources (for example
BufferedReader.lines()) are mainly geared for sequential use,
processing elements one-by-one as they arrive. Opportunities exist for
supporting highly efficient bulk processing of buffered IO, but these
currently require custom development of Stream sources, Spliterators,
and/or Collectors. Some common forms may be supported in future JDK
releases.
第一个问题是 Files.lines
并行化很糟糕,尤其是对于短于 1024 行的文件。查看 this 问题了解详情。如果您事先知道您的文件足够短,可以放入内存,那么最好先按顺序读取到 List
:
Files.readAllLines(filePath, StandardCharsets.UTF_8).parallelStream()...
我对如何 improve this 有一些想法,但它仍然不是理想的解决方案。事实上,如果您甚至无法估计输入流中的元素数量,Stream API 并行化是非常低效的。
第二个问题是你的forEach
操作。这里只使用了System.out
,所以所有的线程都会尝试写入同一个PrintStream
,争夺同一个资源,这样大部分时间都花在等待锁释放上了。在内部它使用 BufferedWriter
同步所有写入。如果您不在并行操作中使用共享资源,您可能会受益于并行化。
顺便说一下,Files.lines
在 BufferedReader
上创建了一个流。最好用 try-with-resources
语句来管理它。否则,仅当底层 FileInputStream
对象被垃圾回收时,文件才会关闭,因此您可能偶尔会遇到 "too many open files".
之类的错误
下面是我尝试处理从并行流和普通流中的文件读取的行的代码。令人惊讶的是,并行流并没有比普通流有任何改进。我在这里错过了什么吗?
Files.walk(Paths.get(tweetFilePath + LocalDate.now())).forEach(
filePath -> {
if (Files.isRegularFile(filePath) && !filePath.toString().endsWith(".DS_Store")) {
long startTime = System.currentTimeMillis();
try {
Files.lines(filePath).parallel().forEach(line -> {
try {
System.out.println(line);
} catch (Exception e) {
System.out.println("Not able to crunch"+ e);
}
});
} catch (Exception e) {
System.out.println("Bad line in file ");
}finally {
System.out.println("total time required:" + (System.currentTimeMillis() - startTime));
}
}
});
目前看来,Files.lines
是线性读取文件,所以并行调用无法将源流拆分成子流进行并行处理。
details 见此处。相关部分引用如下:
What if my source is based on IO?
Currently, JDK IO-based Stream sources (for example BufferedReader.lines()) are mainly geared for sequential use, processing elements one-by-one as they arrive. Opportunities exist for supporting highly efficient bulk processing of buffered IO, but these currently require custom development of Stream sources, Spliterators, and/or Collectors. Some common forms may be supported in future JDK releases.
第一个问题是 Files.lines
并行化很糟糕,尤其是对于短于 1024 行的文件。查看 this 问题了解详情。如果您事先知道您的文件足够短,可以放入内存,那么最好先按顺序读取到 List
:
Files.readAllLines(filePath, StandardCharsets.UTF_8).parallelStream()...
我对如何 improve this 有一些想法,但它仍然不是理想的解决方案。事实上,如果您甚至无法估计输入流中的元素数量,Stream API 并行化是非常低效的。
第二个问题是你的forEach
操作。这里只使用了System.out
,所以所有的线程都会尝试写入同一个PrintStream
,争夺同一个资源,这样大部分时间都花在等待锁释放上了。在内部它使用 BufferedWriter
同步所有写入。如果您不在并行操作中使用共享资源,您可能会受益于并行化。
顺便说一下,Files.lines
在 BufferedReader
上创建了一个流。最好用 try-with-resources
语句来管理它。否则,仅当底层 FileInputStream
对象被垃圾回收时,文件才会关闭,因此您可能偶尔会遇到 "too many open files".