下面CompletableFuture例子中join的调用是否阻塞进程
Does the call of join in the following CompletableFuture example block the process
我试图理解 CompletableFutures 和 return 完成期货的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。
第一个方法应该是用 userId 列表给出一个可完成的未来,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中是一个字符串)。
总结一下:
1. 获取 ids
2. 获取与这些 ID 对应的用户列表。
我创建了简单的方法来模拟休眠线程的响应。
请检查下面的代码
public class PipelineOfTasksExample {
private Map<Long, String> db = new HashMap<>();
PipelineOfTasksExample() {
db.put(1L, "user1");
db.put(2L, "user2");
db.put(3L, "user3");
db.put(4L, "user4");
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
}
private CompletableFuture<String> fetchById(Long id) {
CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());
return cfId;
}
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenCompose(listOfIds ->
CompletableFuture.supplyAsync(
() -> listOfIds.parallelStream()
.map(id -> example.fetchById(id).join())
.collect(Collectors.toList()
)
)
);
System.out.println(result.join());
}
}
我的问题是,连接调用 (example.fetchById(id).join()
) 是否破坏了进程的非阻塞性质。如果答案是肯定的,我该如何解决这个问题?
提前致谢
你的例子有点奇怪,因为你在 returnUserIdsFromDb()
中减慢了主线程,甚至在任何操作开始之前,同样地,fetchById
减慢了调用者而不是异步操作,这违背了异步操作的全部目的。
此外,您可以简单地使用 .thenApplyAsync(listOfIds -> …)
.
而不是 .thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …))
所以一个更好的例子可能是
public class PipelineOfTasksExample {
private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
.collect(Collectors.toMap(id -> id, id -> "user"+id));
PipelineOfTasksExample() {}
private static <T> T slowDown(String op, T result) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(op + " -> " + result + " thread: "
+ Thread.currentThread().getName()+ ", "
+ POOL.getPoolSize() + " threads");
return result;
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
System.out.println("trigger building the list of Ids - thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
POOL);
}
private CompletableFuture<String> fetchById(Long id) {
System.out.println("trigger fetching id: " + id + " thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("fetching id: " + id , db.get(id)), POOL);
}
static ForkJoinPool POOL = new ForkJoinPool(2);
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenApplyAsync(listOfIds ->
listOfIds.parallelStream()
.map(id -> example.fetchById(id).join())
.collect(Collectors.toList()
),
POOL
);
System.out.println(result.join());
}
}
打印类似
的内容
trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 4 thread: ForkJoinPool-1-worker-2
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads
trigger fetching id: 1 thread: ForkJoinPool-1-worker-3
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads
[user1, user2, user3, user4]
乍一看,线程数可能令人惊讶。
答案是join()
可能会阻塞线程,但是如果这种情况发生在Fork/Join池的工作线程内部,这种情况会被检测到,并启动一个新的补偿线程,以确保配置的目标并行度。
作为一种特殊情况,当使用默认的 Fork/Join 池时,实现可能会在 join()
方法中选取新的待处理任务,以确保同一线程内的进度。
所以代码总是会取得进展,偶尔调用 join()
没有错,如果备选方案要复杂得多,但如果过度使用,就会有资源消耗过多的危险。毕竟之所以要用线程池,就是为了限制线程数。
替代方法是尽可能使用链式依赖操作。
public class PipelineOfTasksExample {
private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
.collect(Collectors.toMap(id -> id, id -> "user"+id));
PipelineOfTasksExample() {}
private static <T> T slowDown(String op, T result) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(op + " -> " + result + " thread: "
+ Thread.currentThread().getName()+ ", "
+ POOL.getPoolSize() + " threads");
return result;
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
System.out.println("trigger building the list of Ids - thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
POOL);
}
private CompletableFuture<String> fetchById(Long id) {
System.out.println("trigger fetching id: " + id + " thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("fetching id: " + id , db.get(id)), POOL);
}
static ForkJoinPool POOL = new ForkJoinPool(2);
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenComposeAsync(listOfIds -> {
List<CompletableFuture<String>> jobs = listOfIds.parallelStream()
.map(id -> example.fetchById(id))
.collect(Collectors.toList());
return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))
.thenApply(_void -> jobs.stream()
.map(CompletableFuture::join).collect(Collectors.toList()));
},
POOL
);
System.out.println(result.join());
System.out.println(ForkJoinPool.commonPool().getPoolSize());
}
}
不同之处在于,首先提交所有异步作业,然后调度调用它们的依赖操作 join
,仅在所有作业完成后执行,因此这些 join
调用永远不会阻塞。只有在 main
方法末尾的最终 join
调用可能会阻塞主线程。
所以这会打印类似
的内容
trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 4 thread: ForkJoinPool-1-worker-1
trigger fetching id: 1 thread: ForkJoinPool-1-worker-0
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads
[user1, user2, user3, user4]
显示无需创建补偿线程,因此线程数与配置的目标并行度匹配。
请注意,如果实际工作是在后台线程中完成的,而不是在 fetchById
方法本身中完成的,那么您现在不再需要并行流,因为没有阻塞 join()
称呼。对于这种情况,仅使用 stream()
通常会获得更高的性能。
我试图理解 CompletableFutures 和 return 完成期货的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。
第一个方法应该是用 userId 列表给出一个可完成的未来,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中是一个字符串)。
总结一下:
1. 获取 ids
2. 获取与这些 ID 对应的用户列表。
我创建了简单的方法来模拟休眠线程的响应。 请检查下面的代码
public class PipelineOfTasksExample {
private Map<Long, String> db = new HashMap<>();
PipelineOfTasksExample() {
db.put(1L, "user1");
db.put(2L, "user2");
db.put(3L, "user3");
db.put(4L, "user4");
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
}
private CompletableFuture<String> fetchById(Long id) {
CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());
return cfId;
}
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenCompose(listOfIds ->
CompletableFuture.supplyAsync(
() -> listOfIds.parallelStream()
.map(id -> example.fetchById(id).join())
.collect(Collectors.toList()
)
)
);
System.out.println(result.join());
}
}
我的问题是,连接调用 (example.fetchById(id).join()
) 是否破坏了进程的非阻塞性质。如果答案是肯定的,我该如何解决这个问题?
提前致谢
你的例子有点奇怪,因为你在 returnUserIdsFromDb()
中减慢了主线程,甚至在任何操作开始之前,同样地,fetchById
减慢了调用者而不是异步操作,这违背了异步操作的全部目的。
此外,您可以简单地使用 .thenApplyAsync(listOfIds -> …)
.
.thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …))
所以一个更好的例子可能是
public class PipelineOfTasksExample {
private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
.collect(Collectors.toMap(id -> id, id -> "user"+id));
PipelineOfTasksExample() {}
private static <T> T slowDown(String op, T result) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(op + " -> " + result + " thread: "
+ Thread.currentThread().getName()+ ", "
+ POOL.getPoolSize() + " threads");
return result;
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
System.out.println("trigger building the list of Ids - thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
POOL);
}
private CompletableFuture<String> fetchById(Long id) {
System.out.println("trigger fetching id: " + id + " thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("fetching id: " + id , db.get(id)), POOL);
}
static ForkJoinPool POOL = new ForkJoinPool(2);
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenApplyAsync(listOfIds ->
listOfIds.parallelStream()
.map(id -> example.fetchById(id).join())
.collect(Collectors.toList()
),
POOL
);
System.out.println(result.join());
}
}
打印类似
的内容trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 4 thread: ForkJoinPool-1-worker-2
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads
trigger fetching id: 1 thread: ForkJoinPool-1-worker-3
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads
[user1, user2, user3, user4]
乍一看,线程数可能令人惊讶。
答案是join()
可能会阻塞线程,但是如果这种情况发生在Fork/Join池的工作线程内部,这种情况会被检测到,并启动一个新的补偿线程,以确保配置的目标并行度。
作为一种特殊情况,当使用默认的 Fork/Join 池时,实现可能会在 join()
方法中选取新的待处理任务,以确保同一线程内的进度。
所以代码总是会取得进展,偶尔调用 join()
没有错,如果备选方案要复杂得多,但如果过度使用,就会有资源消耗过多的危险。毕竟之所以要用线程池,就是为了限制线程数。
替代方法是尽可能使用链式依赖操作。
public class PipelineOfTasksExample {
private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
.collect(Collectors.toMap(id -> id, id -> "user"+id));
PipelineOfTasksExample() {}
private static <T> T slowDown(String op, T result) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(op + " -> " + result + " thread: "
+ Thread.currentThread().getName()+ ", "
+ POOL.getPoolSize() + " threads");
return result;
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
System.out.println("trigger building the list of Ids - thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
POOL);
}
private CompletableFuture<String> fetchById(Long id) {
System.out.println("trigger fetching id: " + id + " thread: "
+ Thread.currentThread().getName());
return CompletableFuture.supplyAsync(
() -> slowDown("fetching id: " + id , db.get(id)), POOL);
}
static ForkJoinPool POOL = new ForkJoinPool(2);
public static void main(String[] args) {
PipelineOfTasksExample example = new PipelineOfTasksExample();
CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
.thenComposeAsync(listOfIds -> {
List<CompletableFuture<String>> jobs = listOfIds.parallelStream()
.map(id -> example.fetchById(id))
.collect(Collectors.toList());
return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))
.thenApply(_void -> jobs.stream()
.map(CompletableFuture::join).collect(Collectors.toList()));
},
POOL
);
System.out.println(result.join());
System.out.println(ForkJoinPool.commonPool().getPoolSize());
}
}
不同之处在于,首先提交所有异步作业,然后调度调用它们的依赖操作 join
,仅在所有作业完成后执行,因此这些 join
调用永远不会阻塞。只有在 main
方法末尾的最终 join
调用可能会阻塞主线程。
所以这会打印类似
的内容trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 4 thread: ForkJoinPool-1-worker-1
trigger fetching id: 1 thread: ForkJoinPool-1-worker-0
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads
[user1, user2, user3, user4]
显示无需创建补偿线程,因此线程数与配置的目标并行度匹配。
请注意,如果实际工作是在后台线程中完成的,而不是在 fetchById
方法本身中完成的,那么您现在不再需要并行流,因为没有阻塞 join()
称呼。对于这种情况,仅使用 stream()
通常会获得更高的性能。