Java 执行器得到第一个符合条件的结果

Java Executor get first result that matches condition

当 Callable return 的结果与条件匹配时,有没有办法停止 Javas ExecuterService?

我有以下算法(代码A):

MyObject functionA(SomeObject someObject) {
    for(int i = 0; i < 100; ++i) {
        MyObject result = someFunction(i, someObject);
        if (result != null) {
            return result;
        }
    }
    return null;
}

为了加快速度,我将代码更改为(代码 B):

MyObject functionB(SomeObject someObject) {
    final ExecutorService executorService = Executors.newCachedThreadPool();
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        Callable<MyObject> newCallable = () -> {
            return someFunction(finalI, someObject);
        };
        callableList.add(newCallable);
    }
    List<Future<MyObject>> futures = executorService.invokeAll(callableList);
    for(int i = 0; i < futures.size(); ++i) {
        Future<MyObject> future = futures.get(i);
        if(future.get() != null) {
            return future.get();
        }
    }
    return null;
}

平均函数 B 运行 是函数 A 的 5 倍(在 8 个内核上)。

这很好,但并不完美。在 functionA 中,someFunction() 在找到结果 != null 之前平均被调用大约 20 次。在 functionB 中,someFunction() 总是被调用 100 次。

有没有办法

a) 停止 executorService,当第一个线程结束时结果为 != null

或更好

b) 停止 executorService,当一个线程完成并且结果 != null 被一个线程和所有线程发现时,其 finalI 值低于发现结果 != null 的线程,并以 reuslt = 结束=空

谢谢

--------编辑--------

感谢 Alex Crazy 和 Saxon 的回答,我能够回答我的问题 A。

  1. 所有结果不满足我条件的 Callables 都应该抛出异常
  2. 我应该使用 invokeAny 而不是 invokeAll
  3. 我应该使用 newFixedThreadPool 而不是 newCachedThreadPool

FixedThreadPool 的使用很重要,因为 CachedThreadPool 运行 所有 100 个任务都是并行的。 FixedThreadPool 仅 运行 与您定义的并行任务一样多。新任务只有在旧任务完成后才开始。

如果您不检查您的可调用对象是否被中断(因为 invokeAny 找到了结果),那么所有可调用对象都将 运行 直到它们完成。 因此,使用 invokeAny 可以更快地获得结果,但处理器的工作时间与使用 invokeAll 一样长。

所以我的新函数如下所示:

MyObject functionC(SomeObject someObject) {
    final ExecutorService executorService = Executors.newFixedThreadPool(8);
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        Callable<MyObject> newCallable = () -> {
            MyObject result = someFunction(finalI, someObject);
            if(result == null) {
                throw new Exception();
            }
            return result;
        };
        callableList.add(newCallable);
    }
    try {
        MyObject result = executorService.invokeAny(callableList);
        return result;
    } catch (InterruptedException | ExecutionException e) {
        return null;
    }
}

functionC 解决了我的问题 A,但它并不总是 return 由 someFunction() 创建的具有最低输入值 finalI 的 MyObject 对象。 例如。 someFunction(3, someObject) 和 someFunction(6, someObject) 会 return 一个不为 null 的 MyObject 对象。
在某些 运行s 中,someFunction(3, someObject) 的结果将是 returned,而在其他 运行s 中,someFunction(6, someObject) 的结果将是 return编辑。 它不是确定性的。

所以我写了新的class ResultCallable

public abstract class ResultCallable<E>  implements Callable<E> {
    E result = null;
}

和函数D

MyObject functionD(SomeObject someObject) {
    final ExecutorService executorService = Executors.newFixedThreadPool(8);
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        ResultCallable<MyObject> newCallable = () -> {
            this.result = someFunction(finalI, someObject);
            if(this.result == null) {
                throw new Exception();
            }
            return this.result;
        };
        callableList.add(newCallable);
    }
    try {
        MyObject result = executorService.invokeAny(callableList);
        executorService.shutdownNow();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        for(int i = 0; i < callableList.size(); ++i) {
            if(callableList.get(i).result != null) {
                return callableList.get(i).result;
            }
        }
    } catch (InterruptedException | ExecutionException e) {
        return null;
    }
    return null;
}

如果 someFunction(3, someObject)someFunction(6, someObject) 将 return 一个不为空的 MyObject 对象,现在主线程在第一个有效结果之后等待所有其他启动的任务完成(使用newFixedThreadPool(8) 还有最多 7 个其他任务 运行ning)。 因为 executorService 运行 按照它们在 callableList 中的顺序排列可调用项,所有具有比第一个有效结果 returned 的可调用项更低的 finalI 值的可调用项已经开始并且将在第 19 行之前完成。

这很好,但并不完美。
假设finalI = 6的任务先完成,finalI从0到5、7的任务还在运行ning。 那么因为 executorService.shutdownNow();没有其他任务将开始,主线程将等待所有 运行ning 任务完成。
但是我只需要等待 finalI 在 0 到 3 之间的任务完成即可。任务3因为它也有结果!= null和任务0、1、2证明任务3下面没有结果。

我可以用类似下面的东西替换 executorService.awaitTermination 和 for 循环吗?

for(int i = 0; i < callableList.size(); ++i) {
    awaitTermination(callableList.get(i));
    if(callableList.get(i).result != null) {
        return callableList.get(i).result;
    }
}

awaitTermination(callableList.get(i)); 是伪代码。我找不到等待调用执行的函数。

------------------------ 编辑 2 ------------------ ----

我自己解决了

我将 final CountDownLatch countDownLatch = new CountDownLatch(1); 添加到我的 class ResultCallable。
this.result = someFunction(finalI, someObject); 之后我添加了 this.countDownLatch.countDown(); awaitTermination(callableList.get(i));callableList.get(i).countDownLatch.await();

取代

如果你有更好的想法,请告诉我。

为什么不使用 invokeAny 而不是 invokeAll。当结果不好时,抛出异常而不是 returning null 就足够了,并且在 ExecutorService 调用时 return MyObject 而不是 List 就足够了。

解决您的问题的正确方法是:

  1. 在某个地方创建额外的变量,如 private boolean isFinished,它将在所有线程之间共享,并检查此变量是否定期更改其值

  2. 如果可能的话,你必须让你的 Callable 看起来像这样:

     public class SomeRunnableFunction implements Callable {
    
          private volatile boolean running = true;
    
          public void terminate() {
              running = false;
          }
    
          @Override
          public Object call() throws Exception {
               while (running) {
                    // your custom logic
                    return new Object();
               }
               return null;
          }
    }
    
  3. 然后一个线程完成将 private boolean isFinished 更改为 true 然后您必须通过调用 someRunnableFunction.terminate()

    取消所有已经启动的任务
  4. 您也可以使用 feature.cancel() 代替 someRunnableFunction.terminate() 然后您的可调用对象将如下所示:

     public class SomeRunnableFunction implements Callable {
    
          @Override
          public Object call() throws Exception {
               while (true) {
                    // Check regularly if the thread has been
                    // interrupted and if so throws an exception to stop
                    // the task immediately 
                    if (Thread.currentThread().isInterrupted()) {
                         throw new InterruptedException("Thread interrupted");
                    }
                    // your custom logic
                    return new Object();
               }
               return null;
          }
    }