Java 清除 CompletionService 工作队列
Java Clear CompletionService Working Queue
我正在编写一个程序,该程序使用 CompletionService
到 运行 线程分析一堆不同的对象,其中每个“分析”包括接收一个字符串并进行一些计算以给出true
或 false
作为答案。我的代码基本上是这样的:
// tasks come from a different method and contain the strings + some other needed info
List<Future<Pair<Pieces,Boolean>>> futures = new ArrayList<>(tasks.size());
for (Task task : tasks) {
futures.add(executorCompletionService.submit(task));
}
ArrayList<Pair<Pieces, Boolean>> pairs = new ArrayList<>();
int toComplete = tasks.size();
int received = 0;
int failed = 0;
while (received < toComplete) {
Future<Pair<Pieces, Boolean>> resFuture = executorCompletionService.take();
received++;
Pair<Pieces, Boolean> res = resFuture.get();
if (!res.getValue()) failed++;
if (failed > 300) {
// My problem is here
}
pairs.add(res);
}
// return pairs and go on to do something else
在标记的部分,我的目标是让它在超过 300 个字符串失败时放弃计算,这样我就可以继续进行新的分析,使用一些不同的数据再次调用此方法。问题是,由于再次使用相同的 CompletionService
,如果我不以某种方式清除队列,那么工作队列将继续增长,因为我每次使用它时都会不断添加更多内容(因为在 300 次失败之后可能还有许多未处理的字符串)。
我曾尝试遍历 futures
列表并使用 futures.foreach(future -> future.cancel(true)
之类的方法删除所有未完成的任务,但是当我下次调用该方法时,出现 java.util.concurrent.CancellationException
错误试着打电话给 resFuture.get()
.
(编辑:似乎即使我调用 foreach(future->future.cancel(true))
,这也不能保证 workerQueue
之后实际上是清楚的。我不明白为什么会这样。看起来好像清空队列需要一段时间,并且代码不会等到清空后再进行下一次分析,因此偶尔 get
会在已取消的未来调用。)
我也试过
while (received < toComplete) {
executorCompletionService.take();
received++;
}
要清空队列,虽然它可以工作,但它只比 运行 进行所有分析快一点,因此效率不是很好。
我的问题是是否有更好的方法来清空工作队列,这样当我下次调用此代码时,就好像 CompletionService
又是新的一样。
编辑:我尝试过的另一种方法是设置 executorCompletionService = new CompletionService
,这比我的其他解决方案稍快,但仍然相当慢,绝对不是好的做法。
P.S.: 也很高兴接受任何其他可能的方式,我不喜欢使用 CompletionService
它只是我所做的最简单的事情到目前为止
此问题已得到解决,但我看到其他类似问题没有很好的答案,所以这是我的解决方案:
之前,我使用 ExecutorService
创建我的 ExecutorCompletionService(ExecutorService)
。我将 ExecutorService
切换为 ThreadPoolExecutor
,因为在支持中 ExecutorService
已经是 ThreadPoolExecutor
所有方法签名都可以通过强制转换来修复。使用 ThreadPoolExecutor
可以让你在后端有更多的自由,特别是你可以调用 threadPoolExecutor.getQueue().clear()
来清除所有等待完成的任务。最后,我需要确保“耗尽”剩余的工作任务,因此我最终的取消代码如下所示:
if (failed > maxFailures) {
executorService.getQueue().clear();
while (executorService.getActiveCount() > 0) {
executorCompletionService.poll();
}
在此代码块结束时,执行程序将准备好再次 运行。
我正在编写一个程序,该程序使用 CompletionService
到 运行 线程分析一堆不同的对象,其中每个“分析”包括接收一个字符串并进行一些计算以给出true
或 false
作为答案。我的代码基本上是这样的:
// tasks come from a different method and contain the strings + some other needed info
List<Future<Pair<Pieces,Boolean>>> futures = new ArrayList<>(tasks.size());
for (Task task : tasks) {
futures.add(executorCompletionService.submit(task));
}
ArrayList<Pair<Pieces, Boolean>> pairs = new ArrayList<>();
int toComplete = tasks.size();
int received = 0;
int failed = 0;
while (received < toComplete) {
Future<Pair<Pieces, Boolean>> resFuture = executorCompletionService.take();
received++;
Pair<Pieces, Boolean> res = resFuture.get();
if (!res.getValue()) failed++;
if (failed > 300) {
// My problem is here
}
pairs.add(res);
}
// return pairs and go on to do something else
在标记的部分,我的目标是让它在超过 300 个字符串失败时放弃计算,这样我就可以继续进行新的分析,使用一些不同的数据再次调用此方法。问题是,由于再次使用相同的 CompletionService
,如果我不以某种方式清除队列,那么工作队列将继续增长,因为我每次使用它时都会不断添加更多内容(因为在 300 次失败之后可能还有许多未处理的字符串)。
我曾尝试遍历 futures
列表并使用 futures.foreach(future -> future.cancel(true)
之类的方法删除所有未完成的任务,但是当我下次调用该方法时,出现 java.util.concurrent.CancellationException
错误试着打电话给 resFuture.get()
.
(编辑:似乎即使我调用 foreach(future->future.cancel(true))
,这也不能保证 workerQueue
之后实际上是清楚的。我不明白为什么会这样。看起来好像清空队列需要一段时间,并且代码不会等到清空后再进行下一次分析,因此偶尔 get
会在已取消的未来调用。)
我也试过
while (received < toComplete) {
executorCompletionService.take();
received++;
}
要清空队列,虽然它可以工作,但它只比 运行 进行所有分析快一点,因此效率不是很好。
我的问题是是否有更好的方法来清空工作队列,这样当我下次调用此代码时,就好像 CompletionService
又是新的一样。
编辑:我尝试过的另一种方法是设置 executorCompletionService = new CompletionService
,这比我的其他解决方案稍快,但仍然相当慢,绝对不是好的做法。
P.S.: 也很高兴接受任何其他可能的方式,我不喜欢使用 CompletionService
它只是我所做的最简单的事情到目前为止
此问题已得到解决,但我看到其他类似问题没有很好的答案,所以这是我的解决方案:
之前,我使用 ExecutorService
创建我的 ExecutorCompletionService(ExecutorService)
。我将 ExecutorService
切换为 ThreadPoolExecutor
,因为在支持中 ExecutorService
已经是 ThreadPoolExecutor
所有方法签名都可以通过强制转换来修复。使用 ThreadPoolExecutor
可以让你在后端有更多的自由,特别是你可以调用 threadPoolExecutor.getQueue().clear()
来清除所有等待完成的任务。最后,我需要确保“耗尽”剩余的工作任务,因此我最终的取消代码如下所示:
if (failed > maxFailures) {
executorService.getQueue().clear();
while (executorService.getActiveCount() > 0) {
executorCompletionService.poll();
}
在此代码块结束时,执行程序将准备好再次 运行。