为共享计算实现 Future 接口

Implementing Future interface for shared computation

我正在实现 Future<Collection<Integer>> 接口,以便在应用程序的所有线程之间共享一些批量计算的结果。

事实上,我打算将 class 实现 Future<Collection<Integer>> 的实例放入 ApplicationScope 对象中,以便任何其他需要结果的线程只需要 Futureobject 调用方法 get() ,因此使用另一个线程执行的计算。

我的问题是关于实现 cancel 方法。现在,我会写这样的东西:

public class CustomerFutureImpl implements Future<Collection<Integer>>{

    private Thread computationThread;
    private boolean started;
    private boolean cancelled;
    private Collection<Integer> computationResult;

    private boolean cancel(boolean mayInterruptIfRunning){
        if( computationResult != null )
            return false;
        if( !started ){
            cancelled = true;
            return true;
        } else {
            if(mayInterruptIfRunning)
                 computationThread.interrupt();
        }
    }

    //The rest of the methods
}

但是方法实现不满足Future的文档,因为我们需要在任何等待结果的线程中抛出CancellationException(调用了get()方法) .

我是否应该添加另一个字段,如 private Collection<Thread> waitingForTheResultThreads;,然后从 Collection 中断每个线程,捕获中断的异常,然后 throw new CancellationException()

问题是这样的解决方案对我来说似乎有点奇怪......对此不确定。

通常,您应该完全避免直接实施 Future。并发代码 非常 很难正确,分布式执行的框架 - 特别是 ExecutorService - 将提供 Future 引用你关心的工作单元的实例。

您可能已经知道并且正在有意创建一个新的类似服务,但我觉得重要的是要指出,对于 大量 大多数用例,您不需要定义您自己的 Future 实现。

您可能想看看 Guava 提供的并发工具,特别是 ListenableFuture,它是 Future 的子接口,提供了额外的功能。


假设您确实想要定义自定义 Future 类型,请使用 Guava 的 AbstractFuture 实现作为起点,这样您就不必重新发明您正在使用的复杂细节运行 成.

对于您的具体问题,如果您查看 implementation of AbstractFuture.get(),您会发现它是通过 while 循环实现的,该循环查找 value 变为非空,那时它调用 getDoneValue() 或 return 值或引发 CancellationException。所以本质上,每个阻塞调用 Future.get() 的线程都会每隔一段时间轮询 Future.value 字段并在检测到 Future 已被取消时引发 CancellationException .没有必要跟踪 Collection<Thread> 或任何类似的东西,因为每个线程都可以独立检查 Future 的状态,并且 return 或根据需要抛出。