使用 FutureTask 实现并发

use FutureTask for concurrency

我有这样的服务:

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

现在我想要它更快,我发现一些过滤器可以同时启动,而一些过滤器必须等待其他过滤器完成。例如:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

这意味着:

1.filter1和filter2可以同时启动,filter3和filter4也可以

2.filter3 和 filter4 必须等待 filter2 完成

还有一件事:

如果 filter2 returns 为真,则 'process' 方法 returns 立即并忽略以下过滤器。

现在我的解决方案正在使用 FutureTask:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

我的 CallableFilter:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

我想知道:

1.is 在这种情况下使用 FutureTask 是个好主意吗?有更好的解决方案吗?

2.the线程上下文切换的开销。

谢谢!

在 Java 8 中,您可以使用 CompletableFuture 一个接一个地链接您的过滤器。使用 thenApply 和 thenCompose 系列方法来向 CompletableFuture 添加新的异步过滤器 - 它们将在上一步完成后执行。 thenCombine 在两个独立的 CompletableFutures 都完成时合并它们。使用 allOf 等待两个以上的 CompletableFuture 对象的结果。

如果不能用Java8,那就用GuavaListenableFuture can do the same, see Listenable Future Explained。使用 Guava,您可以等待多个独立的 运行 过滤器完成 Futures.allAsList - 这也是 return 一个 ListenableFuture。

这两种方法的想法是,在您声明您的未来操作、它们之间的依赖关系以及它们的线程之后,您会得到一个封装最终结果的 Future 对象。

编辑:早期的 return 可以通过使用 complete() 方法显式完成 CompletableFuture 或使用 Guava SettableFuture(实现 ListenableFuture)来实现

您可以使用 ForkJoinPool 进行并行化,这对于那种并行计算是明确考虑的:

(...) Method join() and its variants are appropriate for use only when completion dependencies are acyclic; that is, the parallel computation can be described as a directed acyclic graph (DAG) (...)

(参见 ForkJoinTask

ForkJoinPool 的优点是每个任务都可以生成新任务并等待它们完成而不会实际阻塞正在执行的线程(否则如果更多任务正在等待其他任务可能会导致死锁)完成比线程可用)。

这是一个到目前为止应该有效的示例,尽管它还有一些局限性:

  1. 它忽略过滤结果
  2. 如果过滤器 2 returns true
  3. ,它不会提前结束执行
  4. 未实现异常处理

此代码背后的主要思想:每个过滤器都表示为 Node,它可能依赖于其他节点(= 在该过滤器可以执行之前必须完成的过滤器)。从属节点作为并行任务生成。

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Node<V> extends RecursiveTask<V> {
    private static final short VISITED = 1;

    private final Callable<V> callable;
    private final Set<Node<V>> dependencies = new HashSet<>();

    @SafeVarargs
    public Node(Callable<V> callable, Node<V>... dependencies) {
        this.callable = callable;
        this.dependencies.addAll(Arrays.asList(dependencies));
    }

    public Set<Node<V>> getDependencies() {
        return this.dependencies;
    }

    @Override
    protected V compute() {
        try {
            // resolve dependencies first
            for (Node<V> node : dependencies) {
                if (node.tryMarkVisited()) {
                    node.fork(); // start node
                }
            }

            // wait for ALL nodes to complete
            for (Node<V> node : dependencies) {
                node.join();
            }

            return callable.call();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }

    public boolean tryMarkVisited() {
        return compareAndSetForkJoinTaskTag((short) 0, VISITED);
    }
}

用法示例:

public static void main(String[] args) {
    Node<Void> filter1 = new Node<>(filter("filter1"));
    Node<Void> filter2 = new Node<>(filter("filter2"));
    Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
    Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
    Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
    Node<Void> root = new Node<>(() -> null, filter5);

    ForkJoinPool.commonPool().invoke(root);
}

public static Callable<Void> filter(String name) {
    return () -> {
        System.out.println(Thread.currentThread().getName() + ": start " + name);
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + ": end   " + name);
        return null;
    };
}