并发执行:未来与并行流
Concurrent Execution: Future vs parallelstream
我编写了一个可调用程序,用于轮询远程客户端的信息,returns 该信息以列表形式显示。我正在使用线程池执行器、for 循环和 Future 来针对多个远程客户端并行执行任务。然后我将所有 Future 列表与 addAll() 结合起来并使用巨大的组合列表。
我的问题是,在这里使用 parallelstream() 会比使用 future 和 for 循环更有效吗?编码当然更容易!如果我走那条路,我还需要线程池执行器吗?
谢谢!
for(SiteInfo site : active_sites) {
TAG_SCANNER scanr = new TAG_SCANNER(site, loggr);
Future<List<TagInfo>> result = threadmaker.submit(scanr);
//SOUND THE ALARMS
try {
alarm_tags.addAll(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
可能的解决方案代码? Netbeans 提出了类似的建议
active_sites.parallelstream().map((site) -> new TAG_SCANNER(site, loggr)).map((scanr) -> threadmaker.submit(scanr)).forEach((result) -> {
//SOUND THE ALARMS
try {
alarm_tags.addAll(result.get());
}
catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
一般来说,parallelstream
是由非常聪明的程序员编写的,可以非常有效地进行并行处理。
与所有其他 java 线程一样,例如并发包,除非您是该主题的专家,否则如果您自己编写它,您可能会:
- 运行 较慢
- 引入错误
- 有更多complex/harder到follow/etc代码
换句话说:是的,使用并行流。
这里有几个误解。首先,使用异步任务并不能提高你的资源利用率,如果你在提交任务后立即调用Future.get
,立即等待它完成再提交下一个任务。
其次,Netbeans 进行的代码转换产生了大部分相同的代码,仍然向 Executor
提交任务,因此这不是“未来与并行流”的问题,因为您只是执行提交(并等待) 与并行流并仍在使用执行程序。由于您的第一个错误,并行执行可能会提高吞吐量,但除此之外,将两个错误组合起来让它们相互抵消绝不是一个好主意,这仍然是一个糟糕的解决方案:
Stream API 的标准实现针对 CPU 绑定任务进行了优化,创建了与 CPU 核心数量相匹配的线程数量,并且在这些线程出现时不会产生新线程线程在等待操作中被阻塞。因此,使用并行流来执行 I/O 操作,或者通常可能等待的操作,不是一个好的选择。而且您无法控制实现所使用的线程。
更好的选择是继续使用 ExecutorService
,您可以根据您对远程客户端的预期 I/O 带宽进行配置。但是你应该修复提交后立即等待的错误,先提交所有任务,然后等待所有任务完成。请注意,您可以为此使用流 API,不是为了更好的并行性,而是可能提高可读性:
// first, submit all tasks, assuming "threadmaker" is an ExecutorService
List<Future<List<TagInfo>>> futures=threadmaker.invokeAll(
active_sites.stream()
.map(site -> new TAG_SCANNER(site, loggr))
.collect(Collectors.toList())
);
// now fetch all results
for(Future<List<TagInfo>> result: futures) {
//SOUND THE ALARMS
try {
alarm_tags.addAll(result.get());
} catch (InterruptedException | ExecutionException e) {
// not a recommended way of handling
// but I keep your code here for simplicity
e.printStackTrace();
}
}
请注意,此处使用的流 API 是 顺序 并且仅用于将 SiteInfo
列表转换为 Callable<List<TagInfo>>
列表,但是你可以使用循环来做同样的事情。
我编写了一个可调用程序,用于轮询远程客户端的信息,returns 该信息以列表形式显示。我正在使用线程池执行器、for 循环和 Future 来针对多个远程客户端并行执行任务。然后我将所有 Future 列表与 addAll() 结合起来并使用巨大的组合列表。
我的问题是,在这里使用 parallelstream() 会比使用 future 和 for 循环更有效吗?编码当然更容易!如果我走那条路,我还需要线程池执行器吗?
谢谢!
for(SiteInfo site : active_sites) {
TAG_SCANNER scanr = new TAG_SCANNER(site, loggr);
Future<List<TagInfo>> result = threadmaker.submit(scanr);
//SOUND THE ALARMS
try {
alarm_tags.addAll(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
可能的解决方案代码? Netbeans 提出了类似的建议
active_sites.parallelstream().map((site) -> new TAG_SCANNER(site, loggr)).map((scanr) -> threadmaker.submit(scanr)).forEach((result) -> {
//SOUND THE ALARMS
try {
alarm_tags.addAll(result.get());
}
catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
一般来说,parallelstream
是由非常聪明的程序员编写的,可以非常有效地进行并行处理。
与所有其他 java 线程一样,例如并发包,除非您是该主题的专家,否则如果您自己编写它,您可能会:
- 运行 较慢
- 引入错误
- 有更多complex/harder到follow/etc代码
换句话说:是的,使用并行流。
这里有几个误解。首先,使用异步任务并不能提高你的资源利用率,如果你在提交任务后立即调用Future.get
,立即等待它完成再提交下一个任务。
其次,Netbeans 进行的代码转换产生了大部分相同的代码,仍然向 Executor
提交任务,因此这不是“未来与并行流”的问题,因为您只是执行提交(并等待) 与并行流并仍在使用执行程序。由于您的第一个错误,并行执行可能会提高吞吐量,但除此之外,将两个错误组合起来让它们相互抵消绝不是一个好主意,这仍然是一个糟糕的解决方案:
Stream API 的标准实现针对 CPU 绑定任务进行了优化,创建了与 CPU 核心数量相匹配的线程数量,并且在这些线程出现时不会产生新线程线程在等待操作中被阻塞。因此,使用并行流来执行 I/O 操作,或者通常可能等待的操作,不是一个好的选择。而且您无法控制实现所使用的线程。
更好的选择是继续使用 ExecutorService
,您可以根据您对远程客户端的预期 I/O 带宽进行配置。但是你应该修复提交后立即等待的错误,先提交所有任务,然后等待所有任务完成。请注意,您可以为此使用流 API,不是为了更好的并行性,而是可能提高可读性:
// first, submit all tasks, assuming "threadmaker" is an ExecutorService
List<Future<List<TagInfo>>> futures=threadmaker.invokeAll(
active_sites.stream()
.map(site -> new TAG_SCANNER(site, loggr))
.collect(Collectors.toList())
);
// now fetch all results
for(Future<List<TagInfo>> result: futures) {
//SOUND THE ALARMS
try {
alarm_tags.addAll(result.get());
} catch (InterruptedException | ExecutionException e) {
// not a recommended way of handling
// but I keep your code here for simplicity
e.printStackTrace();
}
}
请注意,此处使用的流 API 是 顺序 并且仅用于将 SiteInfo
列表转换为 Callable<List<TagInfo>>
列表,但是你可以使用循环来做同样的事情。