Java 执行器得到第一个符合条件的结果
Java Executor get first result that matches condition
当 Callable return 的结果与条件匹配时,有没有办法停止 Javas ExecuterService?
我有以下算法(代码A):
MyObject functionA(SomeObject someObject) {
for(int i = 0; i < 100; ++i) {
MyObject result = someFunction(i, someObject);
if (result != null) {
return result;
}
}
return null;
}
为了加快速度,我将代码更改为(代码 B):
MyObject functionB(SomeObject someObject) {
final ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
return someFunction(finalI, someObject);
};
callableList.add(newCallable);
}
List<Future<MyObject>> futures = executorService.invokeAll(callableList);
for(int i = 0; i < futures.size(); ++i) {
Future<MyObject> future = futures.get(i);
if(future.get() != null) {
return future.get();
}
}
return null;
}
平均函数 B 运行 是函数 A 的 5 倍(在 8 个内核上)。
这很好,但并不完美。在 functionA 中,someFunction() 在找到结果 != null 之前平均被调用大约 20 次。在 functionB 中,someFunction() 总是被调用 100 次。
有没有办法
a) 停止 executorService,当第一个线程结束时结果为 != null
或更好
b) 停止 executorService,当一个线程完成并且结果 != null 被一个线程和所有线程发现时,其 finalI 值低于发现结果 != null 的线程,并以 reuslt = 结束=空
谢谢
--------编辑--------
感谢 Alex Crazy 和 Saxon 的回答,我能够回答我的问题 A。
- 所有结果不满足我条件的 Callables 都应该抛出异常
- 我应该使用 invokeAny 而不是 invokeAll
- 我应该使用 newFixedThreadPool 而不是 newCachedThreadPool
FixedThreadPool 的使用很重要,因为 CachedThreadPool 运行 所有 100 个任务都是并行的。
FixedThreadPool 仅 运行 与您定义的并行任务一样多。新任务只有在旧任务完成后才开始。
如果您不检查您的可调用对象是否被中断(因为 invokeAny 找到了结果),那么所有可调用对象都将 运行 直到它们完成。
因此,使用 invokeAny 可以更快地获得结果,但处理器的工作时间与使用 invokeAll 一样长。
所以我的新函数如下所示:
MyObject functionC(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
MyObject result = someFunction(finalI, someObject);
if(result == null) {
throw new Exception();
}
return result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
return result;
} catch (InterruptedException | ExecutionException e) {
return null;
}
}
functionC 解决了我的问题 A,但它并不总是 return 由 someFunction() 创建的具有最低输入值 finalI 的 MyObject 对象。
例如。 someFunction(3, someObject) 和 someFunction(6, someObject) 会 return 一个不为 null 的 MyObject 对象。
在某些 运行s 中,someFunction(3, someObject) 的结果将是 returned,而在其他 运行s 中,someFunction(6, someObject) 的结果将是 return编辑。
它不是确定性的。
所以我写了新的class ResultCallable
public abstract class ResultCallable<E> implements Callable<E> {
E result = null;
}
和函数D
MyObject functionD(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
ResultCallable<MyObject> newCallable = () -> {
this.result = someFunction(finalI, someObject);
if(this.result == null) {
throw new Exception();
}
return this.result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.SECONDS);
for(int i = 0; i < callableList.size(); ++i) {
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
} catch (InterruptedException | ExecutionException e) {
return null;
}
return null;
}
如果 someFunction(3, someObject)
和 someFunction(6, someObject)
将 return 一个不为空的 MyObject 对象,现在主线程在第一个有效结果之后等待所有其他启动的任务完成(使用newFixedThreadPool(8) 还有最多 7 个其他任务 运行ning)。
因为 executorService 运行 按照它们在 callableList 中的顺序排列可调用项,所有具有比第一个有效结果 returned 的可调用项更低的 finalI 值的可调用项已经开始并且将在第 19 行之前完成。
这很好,但并不完美。
假设finalI = 6
的任务先完成,finalI从0到5、7的任务还在运行ning。
那么因为 executorService.shutdownNow();没有其他任务将开始,主线程将等待所有 运行ning 任务完成。
但是我只需要等待 finalI 在 0 到 3 之间的任务完成即可。任务3因为它也有结果!= null和任务0、1、2证明任务3下面没有结果。
我可以用类似下面的东西替换 executorService.awaitTermination 和 for 循环吗?
for(int i = 0; i < callableList.size(); ++i) {
awaitTermination(callableList.get(i));
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
awaitTermination(callableList.get(i));
是伪代码。我找不到等待调用执行的函数。
------------------------ 编辑 2 ------------------ ----
我自己解决了
我将 final CountDownLatch countDownLatch = new CountDownLatch(1);
添加到我的 class ResultCallable。
在 this.result = someFunction(finalI, someObject);
之后我添加了 this.countDownLatch.countDown();
awaitTermination(callableList.get(i));
被 callableList.get(i).countDownLatch.await();
取代
如果你有更好的想法,请告诉我。
为什么不使用 invokeAny 而不是 invokeAll。当结果不好时,抛出异常而不是 returning null 就足够了,并且在 ExecutorService 调用时 return MyObject 而不是 List 就足够了。
解决您的问题的正确方法是:
在某个地方创建额外的变量,如 private boolean isFinished
,它将在所有线程之间共享,并检查此变量是否定期更改其值
如果可能的话,你必须让你的 Callable 看起来像这样:
public class SomeRunnableFunction implements Callable {
private volatile boolean running = true;
public void terminate() {
running = false;
}
@Override
public Object call() throws Exception {
while (running) {
// your custom logic
return new Object();
}
return null;
}
}
然后一个线程完成将 private boolean isFinished
更改为 true 然后您必须通过调用 someRunnableFunction.terminate()
取消所有已经启动的任务
您也可以使用 feature.cancel()
代替 someRunnableFunction.terminate()
然后您的可调用对象将如下所示:
public class SomeRunnableFunction implements Callable {
@Override
public Object call() throws Exception {
while (true) {
// Check regularly if the thread has been
// interrupted and if so throws an exception to stop
// the task immediately
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
// your custom logic
return new Object();
}
return null;
}
}
当 Callable return 的结果与条件匹配时,有没有办法停止 Javas ExecuterService?
我有以下算法(代码A):
MyObject functionA(SomeObject someObject) {
for(int i = 0; i < 100; ++i) {
MyObject result = someFunction(i, someObject);
if (result != null) {
return result;
}
}
return null;
}
为了加快速度,我将代码更改为(代码 B):
MyObject functionB(SomeObject someObject) {
final ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
return someFunction(finalI, someObject);
};
callableList.add(newCallable);
}
List<Future<MyObject>> futures = executorService.invokeAll(callableList);
for(int i = 0; i < futures.size(); ++i) {
Future<MyObject> future = futures.get(i);
if(future.get() != null) {
return future.get();
}
}
return null;
}
平均函数 B 运行 是函数 A 的 5 倍(在 8 个内核上)。
这很好,但并不完美。在 functionA 中,someFunction() 在找到结果 != null 之前平均被调用大约 20 次。在 functionB 中,someFunction() 总是被调用 100 次。
有没有办法
a) 停止 executorService,当第一个线程结束时结果为 != null
或更好
b) 停止 executorService,当一个线程完成并且结果 != null 被一个线程和所有线程发现时,其 finalI 值低于发现结果 != null 的线程,并以 reuslt = 结束=空
谢谢
--------编辑--------
感谢 Alex Crazy 和 Saxon 的回答,我能够回答我的问题 A。
- 所有结果不满足我条件的 Callables 都应该抛出异常
- 我应该使用 invokeAny 而不是 invokeAll
- 我应该使用 newFixedThreadPool 而不是 newCachedThreadPool
FixedThreadPool 的使用很重要,因为 CachedThreadPool 运行 所有 100 个任务都是并行的。 FixedThreadPool 仅 运行 与您定义的并行任务一样多。新任务只有在旧任务完成后才开始。
如果您不检查您的可调用对象是否被中断(因为 invokeAny 找到了结果),那么所有可调用对象都将 运行 直到它们完成。 因此,使用 invokeAny 可以更快地获得结果,但处理器的工作时间与使用 invokeAll 一样长。
所以我的新函数如下所示:
MyObject functionC(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
MyObject result = someFunction(finalI, someObject);
if(result == null) {
throw new Exception();
}
return result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
return result;
} catch (InterruptedException | ExecutionException e) {
return null;
}
}
functionC 解决了我的问题 A,但它并不总是 return 由 someFunction() 创建的具有最低输入值 finalI 的 MyObject 对象。
例如。 someFunction(3, someObject) 和 someFunction(6, someObject) 会 return 一个不为 null 的 MyObject 对象。
在某些 运行s 中,someFunction(3, someObject) 的结果将是 returned,而在其他 运行s 中,someFunction(6, someObject) 的结果将是 return编辑。
它不是确定性的。
所以我写了新的class ResultCallable
public abstract class ResultCallable<E> implements Callable<E> {
E result = null;
}
和函数D
MyObject functionD(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
ResultCallable<MyObject> newCallable = () -> {
this.result = someFunction(finalI, someObject);
if(this.result == null) {
throw new Exception();
}
return this.result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.SECONDS);
for(int i = 0; i < callableList.size(); ++i) {
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
} catch (InterruptedException | ExecutionException e) {
return null;
}
return null;
}
如果 someFunction(3, someObject)
和 someFunction(6, someObject)
将 return 一个不为空的 MyObject 对象,现在主线程在第一个有效结果之后等待所有其他启动的任务完成(使用newFixedThreadPool(8) 还有最多 7 个其他任务 运行ning)。
因为 executorService 运行 按照它们在 callableList 中的顺序排列可调用项,所有具有比第一个有效结果 returned 的可调用项更低的 finalI 值的可调用项已经开始并且将在第 19 行之前完成。
这很好,但并不完美。
假设finalI = 6
的任务先完成,finalI从0到5、7的任务还在运行ning。
那么因为 executorService.shutdownNow();没有其他任务将开始,主线程将等待所有 运行ning 任务完成。
但是我只需要等待 finalI 在 0 到 3 之间的任务完成即可。任务3因为它也有结果!= null和任务0、1、2证明任务3下面没有结果。
我可以用类似下面的东西替换 executorService.awaitTermination 和 for 循环吗?
for(int i = 0; i < callableList.size(); ++i) {
awaitTermination(callableList.get(i));
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
awaitTermination(callableList.get(i));
是伪代码。我找不到等待调用执行的函数。
------------------------ 编辑 2 ------------------ ----
我自己解决了
我将 final CountDownLatch countDownLatch = new CountDownLatch(1);
添加到我的 class ResultCallable。
在 this.result = someFunction(finalI, someObject);
之后我添加了 this.countDownLatch.countDown();
awaitTermination(callableList.get(i));
被 callableList.get(i).countDownLatch.await();
如果你有更好的想法,请告诉我。
为什么不使用 invokeAny 而不是 invokeAll。当结果不好时,抛出异常而不是 returning null 就足够了,并且在 ExecutorService 调用时 return MyObject 而不是 List
解决您的问题的正确方法是:
在某个地方创建额外的变量,如
private boolean isFinished
,它将在所有线程之间共享,并检查此变量是否定期更改其值如果可能的话,你必须让你的 Callable 看起来像这样:
public class SomeRunnableFunction implements Callable { private volatile boolean running = true; public void terminate() { running = false; } @Override public Object call() throws Exception { while (running) { // your custom logic return new Object(); } return null; } }
然后一个线程完成将
取消所有已经启动的任务private boolean isFinished
更改为 true 然后您必须通过调用someRunnableFunction.terminate()
您也可以使用
feature.cancel()
代替someRunnableFunction.terminate()
然后您的可调用对象将如下所示:public class SomeRunnableFunction implements Callable { @Override public Object call() throws Exception { while (true) { // Check regularly if the thread has been // interrupted and if so throws an exception to stop // the task immediately if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread interrupted"); } // your custom logic return new Object(); } return null; } }