为共享计算实现 Future 接口
Implementing Future interface for shared computation
我正在实现 Future<Collection<Integer>>
接口,以便在应用程序的所有线程之间共享一些批量计算的结果。
事实上,我打算将 class 实现 Future<Collection<Integer>>
的实例放入 ApplicationScope 对象中,以便任何其他需要结果的线程只需要 Future
从 object
调用方法 get()
,因此使用另一个线程执行的计算。
我的问题是关于实现 cancel
方法。现在,我会写这样的东西:
public class CustomerFutureImpl implements Future<Collection<Integer>>{
private Thread computationThread;
private boolean started;
private boolean cancelled;
private Collection<Integer> computationResult;
private boolean cancel(boolean mayInterruptIfRunning){
if( computationResult != null )
return false;
if( !started ){
cancelled = true;
return true;
} else {
if(mayInterruptIfRunning)
computationThread.interrupt();
}
}
//The rest of the methods
}
但是方法实现不满足Future的文档,因为我们需要在任何等待结果的线程中抛出CancellationException
(调用了get()
方法) .
我是否应该添加另一个字段,如 private Collection<Thread> waitingForTheResultThreads;
,然后从 Collection
中断每个线程,捕获中断的异常,然后 throw new CancellationException()
?
问题是这样的解决方案对我来说似乎有点奇怪......对此不确定。
通常,您应该完全避免直接实施 Future
。并发代码 非常 很难正确,分布式执行的框架 - 特别是 ExecutorService
- 将提供 Future
引用你关心的工作单元的实例。
您可能已经知道并且正在有意创建一个新的类似服务,但我觉得重要的是要指出,对于 大量 大多数用例,您不需要定义您自己的 Future
实现。
您可能想看看 Guava 提供的并发工具,特别是 ListenableFuture
,它是 Future
的子接口,提供了额外的功能。
假设您确实想要定义自定义 Future
类型,请使用 Guava 的 AbstractFuture
实现作为起点,这样您就不必重新发明您正在使用的复杂细节运行 成.
对于您的具体问题,如果您查看 implementation of AbstractFuture.get()
,您会发现它是通过 while
循环实现的,该循环查找 value
变为非空,那时它调用 getDoneValue()
或 return 值或引发 CancellationException
。所以本质上,每个阻塞调用 Future.get()
的线程都会每隔一段时间轮询 Future.value
字段并在检测到 Future
已被取消时引发 CancellationException
.没有必要跟踪 Collection<Thread>
或任何类似的东西,因为每个线程都可以独立检查 Future
的状态,并且 return 或根据需要抛出。
我正在实现 Future<Collection<Integer>>
接口,以便在应用程序的所有线程之间共享一些批量计算的结果。
事实上,我打算将 class 实现 Future<Collection<Integer>>
的实例放入 ApplicationScope 对象中,以便任何其他需要结果的线程只需要 Future
从 object
调用方法 get()
,因此使用另一个线程执行的计算。
我的问题是关于实现 cancel
方法。现在,我会写这样的东西:
public class CustomerFutureImpl implements Future<Collection<Integer>>{
private Thread computationThread;
private boolean started;
private boolean cancelled;
private Collection<Integer> computationResult;
private boolean cancel(boolean mayInterruptIfRunning){
if( computationResult != null )
return false;
if( !started ){
cancelled = true;
return true;
} else {
if(mayInterruptIfRunning)
computationThread.interrupt();
}
}
//The rest of the methods
}
但是方法实现不满足Future的文档,因为我们需要在任何等待结果的线程中抛出CancellationException
(调用了get()
方法) .
我是否应该添加另一个字段,如 private Collection<Thread> waitingForTheResultThreads;
,然后从 Collection
中断每个线程,捕获中断的异常,然后 throw new CancellationException()
?
问题是这样的解决方案对我来说似乎有点奇怪......对此不确定。
通常,您应该完全避免直接实施 Future
。并发代码 非常 很难正确,分布式执行的框架 - 特别是 ExecutorService
- 将提供 Future
引用你关心的工作单元的实例。
您可能已经知道并且正在有意创建一个新的类似服务,但我觉得重要的是要指出,对于 大量 大多数用例,您不需要定义您自己的 Future
实现。
您可能想看看 Guava 提供的并发工具,特别是 ListenableFuture
,它是 Future
的子接口,提供了额外的功能。
假设您确实想要定义自定义 Future
类型,请使用 Guava 的 AbstractFuture
实现作为起点,这样您就不必重新发明您正在使用的复杂细节运行 成.
对于您的具体问题,如果您查看 implementation of AbstractFuture.get()
,您会发现它是通过 while
循环实现的,该循环查找 value
变为非空,那时它调用 getDoneValue()
或 return 值或引发 CancellationException
。所以本质上,每个阻塞调用 Future.get()
的线程都会每隔一段时间轮询 Future.value
字段并在检测到 Future
已被取消时引发 CancellationException
.没有必要跟踪 Collection<Thread>
或任何类似的东西,因为每个线程都可以独立检查 Future
的状态,并且 return 或根据需要抛出。