如何保证 CompletableFuture 在执行一段代码之前完全结束?
How to ensure that CompletableFuture is completely finished before execute a piece of code?
我有这段代码,我有两个问题。
- 我不确定为什么会看到 TimeoutException,因为任何地方都没有阻塞。
- 我想用
Collector
实现的是我有一个 class 将进入收集一堆指标,然后 CompletableFuture
完全完成我会执行 Collector
来发布指标。 finally
是否保证它会最后执行,因为我认为 .get()
应该被阻止直到它完成?
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
public class FutureWithCollector
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
Collector collector = new Collector();
finalize(() -> query(collector), collector);
}
private static void finalize(Supplier<CompletableFuture<String>> submission, Collector collector) throws ExecutionException, InterruptedException
{
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture<String> s = CompletableFuture.supplyAsync(submission, executorService).thenCompose(Function.identity());
try {
String result = s.get(1000, TimeUnit.MILLISECONDS);
System.out.println(result);
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
System.out.println(collector.getI());
}
}
private static CompletableFuture<String> query(Collector collector)
{
CompletableFuture<String> future = new CompletableFuture<>();
return future.thenApply(r -> {
collector.collectStuff();
return "Hello";
});
}
}
class Collector
{
private volatile int i;
public void collectStuff()
{
i++;
}
public int getI()
{
return i;
}
}
输出
java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at FutureWithCollector.finalize(FutureWithCollector.java:23)
at FutureWithCollector.main(FutureWithCollector.java:15)
0
I'm not sure why I see TimeoutException
as there's no blocking anywhere.
正如 Louis Wasserman 指出的那样,它来自 s.get(1000, TimeUnit.MILLISECONDS)
,这是一个阻塞调用。 (当然,它最多只阻塞1秒。但它仍然阻塞。)
Is finally guaranteed that it will be executed last.
是的...但这不是突出的问题,因为...
as I think .get()
is supposed to be blocked until it's finished?
您没有使用 get()
。您正在使用 get
超时!
在与未来相对应的任务完成之前,不能保证对 get
的超时调用会阻塞。它实际上一直等到任务完成或 1 秒超时到期。 (先到先得。)
如果您想 确定 任务已经完成,请不要在 CompletableFuture
上超时调用 get
。
你不能两全其美1。您要么等到任务完成(这可能需要无限长的时间),要么等待超时(任务可能尚未完成)。
1 - ...除非你有一个完全运行的时间机器,它可以让你进入未来,找出任务的结果,然后回到现在传递那个价值。
当你写 s.get(1000, TimeUnit.MILLISECONDS)
时,编译器警告你处理 TimeoutException
。这也许就是您添加基本 catch 块的原因。
catch (TimeoutException e) {
e.printStackTrace();
}
在这种情况下,查看 javadoc 为该方法说明的内容总是有帮助的,
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
现在您还有 2 个异常需要处理,您的 IDE 或编译器是否也抱怨您要处理它们?
此外,finally
将在 get()
之后执行 returns 来自 Future 的期望值或抛出异常(直到它从 get() 中出来,因为它是一个阻塞打电话)。
这很简单,只需更改其中一种方法即可:
private static CompletableFuture<String> query(Collector collector) {
return CompletableFuture.supplyAsync(() -> {
collector.collectStuff();
return "hello";
});
}
您正在做:
CompletableFuture<String> future = new CompletableFuture<>();
记录为:
Creates a new incomplete CompletableFuture.
基本上,没有人完成这个CompletableFuture
,所以你总是会超时,不管它有多大。
您也可以稍微更改一下代码。如果您想 run
某事,请明确说明:
private static CompletableFuture<Void> query(Collector collector) {
return CompletableFuture.runAsync(collector::collectStuff);
}
那么请注意collectStuff
递增了一个volatile
,但是这些递增不是原子的。
并且您始终可以使用 join
而不是 get
并且不处理已检查的异常(假设没有 join
需要超时)。
我有这段代码,我有两个问题。
- 我不确定为什么会看到 TimeoutException,因为任何地方都没有阻塞。
- 我想用
Collector
实现的是我有一个 class 将进入收集一堆指标,然后CompletableFuture
完全完成我会执行Collector
来发布指标。finally
是否保证它会最后执行,因为我认为.get()
应该被阻止直到它完成?
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
public class FutureWithCollector
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
Collector collector = new Collector();
finalize(() -> query(collector), collector);
}
private static void finalize(Supplier<CompletableFuture<String>> submission, Collector collector) throws ExecutionException, InterruptedException
{
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture<String> s = CompletableFuture.supplyAsync(submission, executorService).thenCompose(Function.identity());
try {
String result = s.get(1000, TimeUnit.MILLISECONDS);
System.out.println(result);
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
System.out.println(collector.getI());
}
}
private static CompletableFuture<String> query(Collector collector)
{
CompletableFuture<String> future = new CompletableFuture<>();
return future.thenApply(r -> {
collector.collectStuff();
return "Hello";
});
}
}
class Collector
{
private volatile int i;
public void collectStuff()
{
i++;
}
public int getI()
{
return i;
}
}
输出
java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at FutureWithCollector.finalize(FutureWithCollector.java:23)
at FutureWithCollector.main(FutureWithCollector.java:15)
0
I'm not sure why I see
TimeoutException
as there's no blocking anywhere.
正如 Louis Wasserman 指出的那样,它来自 s.get(1000, TimeUnit.MILLISECONDS)
,这是一个阻塞调用。 (当然,它最多只阻塞1秒。但它仍然阻塞。)
Is finally guaranteed that it will be executed last.
是的...但这不是突出的问题,因为...
as I think
.get()
is supposed to be blocked until it's finished?
您没有使用 get()
。您正在使用 get
超时!
在与未来相对应的任务完成之前,不能保证对 get
的超时调用会阻塞。它实际上一直等到任务完成或 1 秒超时到期。 (先到先得。)
如果您想 确定 任务已经完成,请不要在 CompletableFuture
上超时调用 get
。
你不能两全其美1。您要么等到任务完成(这可能需要无限长的时间),要么等待超时(任务可能尚未完成)。
1 - ...除非你有一个完全运行的时间机器,它可以让你进入未来,找出任务的结果,然后回到现在传递那个价值。
当你写 s.get(1000, TimeUnit.MILLISECONDS)
时,编译器警告你处理 TimeoutException
。这也许就是您添加基本 catch 块的原因。
catch (TimeoutException e) {
e.printStackTrace();
}
在这种情况下,查看 javadoc 为该方法说明的内容总是有帮助的,
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
现在您还有 2 个异常需要处理,您的 IDE 或编译器是否也抱怨您要处理它们?
此外,finally
将在 get()
之后执行 returns 来自 Future 的期望值或抛出异常(直到它从 get() 中出来,因为它是一个阻塞打电话)。
这很简单,只需更改其中一种方法即可:
private static CompletableFuture<String> query(Collector collector) {
return CompletableFuture.supplyAsync(() -> {
collector.collectStuff();
return "hello";
});
}
您正在做:
CompletableFuture<String> future = new CompletableFuture<>();
记录为:
Creates a new incomplete CompletableFuture.
基本上,没有人完成这个CompletableFuture
,所以你总是会超时,不管它有多大。
您也可以稍微更改一下代码。如果您想 run
某事,请明确说明:
private static CompletableFuture<Void> query(Collector collector) {
return CompletableFuture.runAsync(collector::collectStuff);
}
那么请注意collectStuff
递增了一个volatile
,但是这些递增不是原子的。
并且您始终可以使用 join
而不是 get
并且不处理已检查的异常(假设没有 join
需要超时)。