从 Java 并行流产生其他并行流并且很少失败
From Java parallelstream spawns other parallelStreams and fails seldom
考虑以下功能:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
List<CDR> cdrs = new ArrayList<CDR>();
Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
cdrs.sort(cdrsorter);
}
读取包含 CDR 的文件列表并执行 readCDRP(),即:
private void readCDRP(List<CDR> cdrs, File file) {
final CDR cdr = new CDR(file.getName());
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
List<String> lines = bfr.lines().collect(Collectors.toList());
lines.parallelStream().forEach(e -> {
String[] data = e.split(",", -1);
CDREntry entry = new CDREntry(file.getName());
for (int i = 0; i < data.length; i++) {
entry.setField(i, data[i]);
}
cdr.addEntry(entry);
});
if (cdr != null) {
cdrs.add(cdr);
}
} catch (IOException e) {
e.printStackTrace();
}
}
我观察到的是,偶尔而不是一直,我要么在 readCDRP 函数中得到一个 ArrayIndexNotBound 异常(这很尴尬,因为 cdr 的列表是一个 ArrayList() ):
cdr.addEntry(entry);
或
在我应用排序的 execute4() 的最后一行。
我认为问题是来自 execute4 的第一个 parallelStream 在内存中与 readCDRP() 中的第二个 parallelStream 执行不在单独的 space 中,并且似乎也错误地共享了数据。使用 "seem" 这个词,因为我无法确认,只是一个 hutch。
问题是:
从 JDK8 的角度来看,我的代码有问题吗?
是否有使用相同流程的解决方法,例如使用 CountDownLatch?
ForkJoinPool 有限制吗?
感谢您的回复....
编辑(1):
addEntry 是 class 本身的一部分:
class CDR {
public final String fileName;
private final List<CDREntry> entries = new ArrayList<CDREntry>();
public CDR(String fileName) {
super();
this.fileName = fileName;
}
public List<CDREntry> getEntries() {
return entries;
}
public List<CDREntry> addEntry(CDREntry e) {
entries.add(e);
return entries;
}
public String getFileName() {
return this.fileName;
}
}
从线程安全的角度来看,您的代码是错误的。在 readCDR
中,您将元素添加到 cdrs
列表中,这是一个 ArrayList
不支持并发写入的列表。这就是它崩溃的原因。
更好的方法是让 readCDR
return 一个 cdr
对象并执行如下操作:
List<CDR> cdrs = Arrays.stream(files)
.parallel()
.map(this::readCDR)
.collect(Collectors.toList());
此外,对 IO 相关操作使用并行流通常不是一个好主意,但那是另一个讨论。
您正在使用并行流和具有副作用的 lambda
(lambda 更新 ArrayList 'cdrs')
尝试使用收集器或归约操作。
当您开始以函数式风格编程时,您应该更喜欢不可变对象,它们可以通过构造(或者可能使用构建器模式或某些工厂方法)完全创建。所以你的 CDREntry
class 可能看起来像这样:
class CDREntry {
private final String[] fields;
private final String name;
public CDREntry(String name, String[] fields) {
this.name = name;
this.fields = fields;
}
// Add getters and whatever
}
你的 CDR
class 可能看起来像这样:
class CDR {
private final String fileName;
private final List<CDREntry> entries;
public CDR(String fileName, List<CDREntry> entries) {
this.fileName = fileName;
this.entries = entries;
}
public List<CDREntry> getEntries() {
return entries;
}
public String getFileName() {
return this.fileName;
}
}
有了这样的 classes 事情变得容易了。其余代码可以这样重写:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File data, String name) ->
data.getName().endsWith("CDR")); // fixed this line: it had compilation error
List<CDR> cdrs = Arrays.stream(files).parallel()
.map(this::readCDRP).sorted(cdrsorter)
.collect(Collectors.toList());
}
private CDR readCDRP(File file) {
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
// I'm not sure that collecting lines into list
// before main processing was actually necessary
return bfr.lines().parallelStream()
.map(e -> new CDREntry(file.getName(), e.split(",", -1)))
.collect(Collectors.collectingAndThen(
Collectors.toList(), list -> new CDR(file.getName(), list)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
总的来说,请记住 forEach
通常不是解决任务的最干净的方法。当您将流集成到遗留代码中时,这可能会有所帮助,但通常应该避免。
考虑以下功能:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
List<CDR> cdrs = new ArrayList<CDR>();
Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
cdrs.sort(cdrsorter);
}
读取包含 CDR 的文件列表并执行 readCDRP(),即:
private void readCDRP(List<CDR> cdrs, File file) {
final CDR cdr = new CDR(file.getName());
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
List<String> lines = bfr.lines().collect(Collectors.toList());
lines.parallelStream().forEach(e -> {
String[] data = e.split(",", -1);
CDREntry entry = new CDREntry(file.getName());
for (int i = 0; i < data.length; i++) {
entry.setField(i, data[i]);
}
cdr.addEntry(entry);
});
if (cdr != null) {
cdrs.add(cdr);
}
} catch (IOException e) {
e.printStackTrace();
}
}
我观察到的是,偶尔而不是一直,我要么在 readCDRP 函数中得到一个 ArrayIndexNotBound 异常(这很尴尬,因为 cdr 的列表是一个 ArrayList() ):
cdr.addEntry(entry);
或 在我应用排序的 execute4() 的最后一行。
我认为问题是来自 execute4 的第一个 parallelStream 在内存中与 readCDRP() 中的第二个 parallelStream 执行不在单独的 space 中,并且似乎也错误地共享了数据。使用 "seem" 这个词,因为我无法确认,只是一个 hutch。
问题是: 从 JDK8 的角度来看,我的代码有问题吗? 是否有使用相同流程的解决方法,例如使用 CountDownLatch? ForkJoinPool 有限制吗?
感谢您的回复....
编辑(1): addEntry 是 class 本身的一部分:
class CDR {
public final String fileName;
private final List<CDREntry> entries = new ArrayList<CDREntry>();
public CDR(String fileName) {
super();
this.fileName = fileName;
}
public List<CDREntry> getEntries() {
return entries;
}
public List<CDREntry> addEntry(CDREntry e) {
entries.add(e);
return entries;
}
public String getFileName() {
return this.fileName;
}
}
从线程安全的角度来看,您的代码是错误的。在 readCDR
中,您将元素添加到 cdrs
列表中,这是一个 ArrayList
不支持并发写入的列表。这就是它崩溃的原因。
更好的方法是让 readCDR
return 一个 cdr
对象并执行如下操作:
List<CDR> cdrs = Arrays.stream(files)
.parallel()
.map(this::readCDR)
.collect(Collectors.toList());
此外,对 IO 相关操作使用并行流通常不是一个好主意,但那是另一个讨论。
您正在使用并行流和具有副作用的 lambda (lambda 更新 ArrayList 'cdrs') 尝试使用收集器或归约操作。
当您开始以函数式风格编程时,您应该更喜欢不可变对象,它们可以通过构造(或者可能使用构建器模式或某些工厂方法)完全创建。所以你的 CDREntry
class 可能看起来像这样:
class CDREntry {
private final String[] fields;
private final String name;
public CDREntry(String name, String[] fields) {
this.name = name;
this.fields = fields;
}
// Add getters and whatever
}
你的 CDR
class 可能看起来像这样:
class CDR {
private final String fileName;
private final List<CDREntry> entries;
public CDR(String fileName, List<CDREntry> entries) {
this.fileName = fileName;
this.entries = entries;
}
public List<CDREntry> getEntries() {
return entries;
}
public String getFileName() {
return this.fileName;
}
}
有了这样的 classes 事情变得容易了。其余代码可以这样重写:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File data, String name) ->
data.getName().endsWith("CDR")); // fixed this line: it had compilation error
List<CDR> cdrs = Arrays.stream(files).parallel()
.map(this::readCDRP).sorted(cdrsorter)
.collect(Collectors.toList());
}
private CDR readCDRP(File file) {
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
// I'm not sure that collecting lines into list
// before main processing was actually necessary
return bfr.lines().parallelStream()
.map(e -> new CDREntry(file.getName(), e.split(",", -1)))
.collect(Collectors.collectingAndThen(
Collectors.toList(), list -> new CDR(file.getName(), list)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
总的来说,请记住 forEach
通常不是解决任务的最干净的方法。当您将流集成到遗留代码中时,这可能会有所帮助,但通常应该避免。