如何在与前一阶段 运行 相同的线程上强制 CompletableFuture.thenApply() 到 运行?
How to force CompletableFuture.thenApply() to run on the same thread that ran the previous stage?
这是我面临的问题的简短代码版本:
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
/*
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {}
*/
//System.out.println("supplyAsync: " + Thread.currentThread().getName());
return 1;
})
.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
}
这是我得到的输出:
apply: main
accept: main
result: 2
看到 main
我很惊讶!当我取消注释 Thread.sleep()
调用甚至取消注释那里的单个 sysout
语句时,我预计会发生这样的事情:
supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2
我知道 thenApplyAsync()
将确保它不会 运行 在 main
线程上,但我想避免传递供应商从线程返回的数据 运行 supplyAsync
到 运行 thenApply
和链中其他后续 then
的线程。
方法 thenApply
在调用者的线程中计算函数,因为 future 已经完成。当然,当你插入一个sleep
到supplier,未来还没有完成的时候,thenApply
就被调用了。即使是 print 语句也可能减慢供应商的速度,足以让主线程首先调用 thenApply
和 thenAccept
。但这不是可靠的行为,当重复 运行 代码时,您可能会得到不同的结果。
未来不仅不记得是哪个线程完成的,也无法告诉任意线程执行特定代码。该线程可能正忙于其他事情,完全不合作,甚至同时终止。
考虑一下
ExecutorService s = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync: " + Thread.currentThread().getName());
return 1;
}, s);
s.shutdown();
s.awaitTermination(1, TimeUnit.DAYS);
cf.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
我们如何期望传递给 thenApply
和 thenAccept
的函数在已经终止的池的工作线程中执行?
我们也可以这样写
CompletableFuture<Integer> cf = new CompletableFuture<>();
Thread t = new Thread(() -> {
System.out.println("completing: " + Thread.currentThread().getName());
cf.complete(1);
});
t.start();
t.join();
System.out.println("completer: " + t.getName() + " " + t.getState());
cf.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
这将打印类似的东西
completing: Thread-0
completer: Thread-0 TERMINATED
apply: main
accept: main
result: 2
显然,我们不能坚持让这个线程处理后续阶段。
但即使该线程是池中仍处于活动状态的工作线程,它也不知道它已经完成了一个未来,也不知道“处理后续阶段”。在 Executor
抽象之后,它刚刚从队列中接收到任意 Runnable
并在处理它之后,它继续其主循环,从队列中获取下一个 Runnable
。
因此,一旦第一个 future 完成,告诉它完成其他 future 的工作的唯一方法是将任务排队。这是在使用 thenApplyAsync
指定同一个池或在没有执行程序的情况下使用 …Async
方法执行所有操作时发生的情况,即使用默认池。
当您对所有 …Async
方法使用 single threaded executor 时,您可以确定所有操作都由同一线程执行,但它们仍会通过池的队列。即便如此,在未来已经完成的情况下,实际上是主线程将相关操作排入队列,线程安全队列和同步开销是不可避免的。
但请注意,即使您首先设法创建了依赖操作链,在单个工作线程按顺序处理它们之前,这种开销仍然存在。每个 future 的完成都是通过以线程安全的方式存储新状态来完成的,使结果对所有其他线程都可能可见,并自动检查是否同时发生了并发完成(例如取消)。然后,在执行之前,当然会以线程安全的方式获取由其他线程链接的相关操作。
所有这些具有同步语义的操作使得在具有相关 CompletableFuture
链时通过同一线程处理数据不太可能带来好处。
可能具有性能优势的实际本地处理的唯一方法是使用
CompletableFuture.runAsync(() -> {
System.out.println("supplyAsync: " + Thread.currentThread().getName());
int i = 1;
System.out.println("apply: " + Thread.currentThread().getName());
i = i + 1;
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
或者,换句话说,如果您不想分离处理,则首先不要创建分离处理阶段。
这是我面临的问题的简短代码版本:
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
/*
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {}
*/
//System.out.println("supplyAsync: " + Thread.currentThread().getName());
return 1;
})
.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
}
这是我得到的输出:
apply: main
accept: main
result: 2
看到 main
我很惊讶!当我取消注释 Thread.sleep()
调用甚至取消注释那里的单个 sysout
语句时,我预计会发生这样的事情:
supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2
我知道 thenApplyAsync()
将确保它不会 运行 在 main
线程上,但我想避免传递供应商从线程返回的数据 运行 supplyAsync
到 运行 thenApply
和链中其他后续 then
的线程。
方法 thenApply
在调用者的线程中计算函数,因为 future 已经完成。当然,当你插入一个sleep
到supplier,未来还没有完成的时候,thenApply
就被调用了。即使是 print 语句也可能减慢供应商的速度,足以让主线程首先调用 thenApply
和 thenAccept
。但这不是可靠的行为,当重复 运行 代码时,您可能会得到不同的结果。
未来不仅不记得是哪个线程完成的,也无法告诉任意线程执行特定代码。该线程可能正忙于其他事情,完全不合作,甚至同时终止。
考虑一下
ExecutorService s = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync: " + Thread.currentThread().getName());
return 1;
}, s);
s.shutdown();
s.awaitTermination(1, TimeUnit.DAYS);
cf.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
我们如何期望传递给 thenApply
和 thenAccept
的函数在已经终止的池的工作线程中执行?
我们也可以这样写
CompletableFuture<Integer> cf = new CompletableFuture<>();
Thread t = new Thread(() -> {
System.out.println("completing: " + Thread.currentThread().getName());
cf.complete(1);
});
t.start();
t.join();
System.out.println("completer: " + t.getName() + " " + t.getState());
cf.thenApply(i -> {
System.out.println("apply: " + Thread.currentThread().getName());
return i + 1;
})
.thenAccept((i) -> {
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
这将打印类似的东西
completing: Thread-0
completer: Thread-0 TERMINATED
apply: main
accept: main
result: 2
显然,我们不能坚持让这个线程处理后续阶段。
但即使该线程是池中仍处于活动状态的工作线程,它也不知道它已经完成了一个未来,也不知道“处理后续阶段”。在 Executor
抽象之后,它刚刚从队列中接收到任意 Runnable
并在处理它之后,它继续其主循环,从队列中获取下一个 Runnable
。
因此,一旦第一个 future 完成,告诉它完成其他 future 的工作的唯一方法是将任务排队。这是在使用 thenApplyAsync
指定同一个池或在没有执行程序的情况下使用 …Async
方法执行所有操作时发生的情况,即使用默认池。
当您对所有 …Async
方法使用 single threaded executor 时,您可以确定所有操作都由同一线程执行,但它们仍会通过池的队列。即便如此,在未来已经完成的情况下,实际上是主线程将相关操作排入队列,线程安全队列和同步开销是不可避免的。
但请注意,即使您首先设法创建了依赖操作链,在单个工作线程按顺序处理它们之前,这种开销仍然存在。每个 future 的完成都是通过以线程安全的方式存储新状态来完成的,使结果对所有其他线程都可能可见,并自动检查是否同时发生了并发完成(例如取消)。然后,在执行之前,当然会以线程安全的方式获取由其他线程链接的相关操作。
所有这些具有同步语义的操作使得在具有相关 CompletableFuture
链时通过同一线程处理数据不太可能带来好处。
可能具有性能优势的实际本地处理的唯一方法是使用
CompletableFuture.runAsync(() -> {
System.out.println("supplyAsync: " + Thread.currentThread().getName());
int i = 1;
System.out.println("apply: " + Thread.currentThread().getName());
i = i + 1;
System.out.println("accept: " + Thread.currentThread().getName());
System.out.println("result: " + i);
}).join();
或者,换句话说,如果您不想分离处理,则首先不要创建分离处理阶段。