如何处理超过默认线程数的 Java 流?
How can I process a Java stream with more than the default number of threads?
默认情况下 Java 流由 common thread pool, which is constructed with default parameters. As has been answered in another question 处理,可以通过指定自定义池或设置 java.util.concurrent.ForkJoinPool.common.parallelism
系统参数来调整这些默认值。
但是,我一直无法通过这两种方法中的任何一种来增加分配给流处理的线程数。例如,考虑下面的程序,它处理包含在其第一个参数中指定的文件中的 IP 地址列表,并输出解析的地址。 运行 这在一个具有大约 13000 个唯一 IP 地址的文件上,我看到使用 Oracle Java Mission Control 少至 16 个线程。其中,只有 5 人是 ForkJoinPool
工人。然而,这个特定的任务会受益于更多的线程,因为线程大部分时间都在等待 DNS 响应。所以我的问题是,我怎样才能真正增加使用的线程数?
我已经在三个环境中试用了该程序;这些是 OS- 报告的线程数。
- Java SE Runtime Environment build 1.8.0_73-b02 在 8 核机器上 运行 Windows 7: 17 个线程
- Java SE Runtime Environment build 1.8.0_66-b17 在 2 核机器上 运行 OS X Darwin 15.2.0:23 个线程
- openjdk 版本 1.8.0_72 在 24 核机器上 运行 FreeBSD 11.0:44 个线程
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return InetAddress.getByName(ipAddress).getHostName();
} catch (UnknownHostException e) {
return ipAddress;
}
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
ForkJoinPool fjp = new ForkJoinPool(100);
try {
fjp.submit(() -> {
try {
Files.lines(path)
.parallel()
.map(line -> addressName(line))
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}).get();
} catch (Exception e) {
System.err.println("Failed: " + e);
}
}
}
您的方法有两个问题。首先是使用自定义 FJP 不会更改流创建的单个任务的最大数量 API,因为这是定义的 in the following way:
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
因此,即使您使用自定义池,并行任务的数量也会受到 commonPoolParallelism * 4
的限制。 (其实不是硬性限制,而是一个目标,但很多情况下任务数等于这个数)。
使用java.util.concurrent.ForkJoinPool.common.parallelism
系统属性可以解决上述问题,但是这里遇到了另一个问题:Files.lines
并行化非常糟糕。请参阅 this question for details. In particular, for 13000 input lines the maximal possible speedup is 3.17x (assuming that every line processing takes roughly the same time) even if you have 100 CPUs. My StreamEx 库为此提供了解决方法(使用 StreamEx.ofLines(path).parallel()
创建流)。另一种可能的解决方案是将文件行顺序读入 List
,然后从中创建一个并行流:
Files.readAllLines(path).parallelStream()...
这将与系统一起工作 属性。但是,一般来说,当任务涉及 I/O 时,Stream API 不太适合并行处理。更灵活的解决方案是对每一行使用 CompletableFuture
:
ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
.map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
.collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
.forEach(System.out::println);
这样您就不需要调整系统 属性 并且可以为单独的任务使用单独的池。
默认情况下 Java 流由 common thread pool, which is constructed with default parameters. As has been answered in another question 处理,可以通过指定自定义池或设置 java.util.concurrent.ForkJoinPool.common.parallelism
系统参数来调整这些默认值。
但是,我一直无法通过这两种方法中的任何一种来增加分配给流处理的线程数。例如,考虑下面的程序,它处理包含在其第一个参数中指定的文件中的 IP 地址列表,并输出解析的地址。 运行 这在一个具有大约 13000 个唯一 IP 地址的文件上,我看到使用 Oracle Java Mission Control 少至 16 个线程。其中,只有 5 人是 ForkJoinPool
工人。然而,这个特定的任务会受益于更多的线程,因为线程大部分时间都在等待 DNS 响应。所以我的问题是,我怎样才能真正增加使用的线程数?
我已经在三个环境中试用了该程序;这些是 OS- 报告的线程数。
- Java SE Runtime Environment build 1.8.0_73-b02 在 8 核机器上 运行 Windows 7: 17 个线程
- Java SE Runtime Environment build 1.8.0_66-b17 在 2 核机器上 运行 OS X Darwin 15.2.0:23 个线程
- openjdk 版本 1.8.0_72 在 24 核机器上 运行 FreeBSD 11.0:44 个线程
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return InetAddress.getByName(ipAddress).getHostName();
} catch (UnknownHostException e) {
return ipAddress;
}
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
ForkJoinPool fjp = new ForkJoinPool(100);
try {
fjp.submit(() -> {
try {
Files.lines(path)
.parallel()
.map(line -> addressName(line))
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}).get();
} catch (Exception e) {
System.err.println("Failed: " + e);
}
}
}
您的方法有两个问题。首先是使用自定义 FJP 不会更改流创建的单个任务的最大数量 API,因为这是定义的 in the following way:
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
因此,即使您使用自定义池,并行任务的数量也会受到 commonPoolParallelism * 4
的限制。 (其实不是硬性限制,而是一个目标,但很多情况下任务数等于这个数)。
使用java.util.concurrent.ForkJoinPool.common.parallelism
系统属性可以解决上述问题,但是这里遇到了另一个问题:Files.lines
并行化非常糟糕。请参阅 this question for details. In particular, for 13000 input lines the maximal possible speedup is 3.17x (assuming that every line processing takes roughly the same time) even if you have 100 CPUs. My StreamEx 库为此提供了解决方法(使用 StreamEx.ofLines(path).parallel()
创建流)。另一种可能的解决方案是将文件行顺序读入 List
,然后从中创建一个并行流:
Files.readAllLines(path).parallelStream()...
这将与系统一起工作 属性。但是,一般来说,当任务涉及 I/O 时,Stream API 不太适合并行处理。更灵活的解决方案是对每一行使用 CompletableFuture
:
ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
.map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
.collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
.forEach(System.out::println);
这样您就不需要调整系统 属性 并且可以为单独的任务使用单独的池。