为分配给线程池的一组任务创建超时 java
Create a timeout for a set of tasks assigned to a threadpool java
您好,我遇到了关于分配给线程池的任务超时的问题 java。
详细:
- 我已经实现了 API,其中 运行 并行查询,return 响应。
- 有一个固定的线程池是通过
Executors.newFixedThreadPool(40)
创建的。每当有人调用此 API 时,一组 10 个任务就会在此线程池上进行调度。这些任务在内部对 Mysql 执行一组查询。
- 我必须 return 以 6-7 秒的 SLA 响应 API。所以我必须为线程池上安排的所有 10 个任务创建一个超时。我知道如何使单个任务超时(它抛出一个中断的异常,尽管线程只有在它启动后完成时才会变得空闲)。
- 我也不想超载线程池。因此,我通过分配给该线程池的任务为 运行 的所有查询创建了 60 秒的超时。因此,线程可以在一分钟内自由地执行另一项任务。
问题:
有什么优雅的方法可以解决这个问题吗?
@Component
public class MyHandler {
@PostConstruct
public void init() {
/*
* Naming thread pool to identify threads in the thread dump
* */
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("my-thread-%d").build();
executorService = Executors.newFixedThreadPool("40", threadFactory);
}
@PreDestroy
public void destroy() {
executorService.shutdown();
}
public void update() {
List<Future<Boolean>> results = new ArrayList<>();
results.add(executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
executeQuery();
return true;
}
}));
/*
* 9 more such tasks
*/
for (Future<Boolean> result : results) {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Failed with unknown error", e);
}
}
}
}
executeQuery()
安排了 60 秒的超时。
您可以使用 ExecutorService.invokeAll()
到 运行 具有超时的任务集合。方法完成后(完成工作或超时),您将必须检查所有期货以查看它们是否被取消(由于超时)或完成。如果它们已完成,您将必须检查它们是否已完成,因为工作已完成,而不是因为异常(当您调用 Future.get
时)。
代码可能如下所示:
final ExecutorService service = Executors.newCachedThreadPool();
final List<Future<Double>> futures = service.invokeAll(tasks, 2, TimeUnit.SECONDS);
final List<CallableTask> tasks = Arrays.asList(new CallableTask(1, TimeUnit.SECONDS),
new CallableTask(1, TimeUnit.HOURS), new CallableTask(100, TimeUnit.MILLISECONDS),
new CallableTask(50, TimeUnit.SECONDS));
for (Future<Double> result : futures) {
if (!result.isCancelled()) {
try {
System.out.println("Result: " + result.get());
} catch (ExecutionException e) {
// Task wasn't completed because of exception, may be required to handle this case
}
}
}
在我的例子中,CallableTask 是一个 Callable 实现,它用于使代码更简单,因为提交的所有任务都是相同的。您可以使用相同的方法来简化您的代码。
我添加了 CallableTask 的示例:
public class CallableTask implements Callable<Double> {
private static AtomicInteger count = new AtomicInteger(0);
private final int timeout;
private final TimeUnit timeUnit;
private final int taskNumber = count.incrementAndGet();
public CallableTask(int timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public Double call() {
System.out.println("Starting task " + taskNumber);
try {
timeUnit.sleep(timeout);
} catch (InterruptedException e) {
System.out.println("Task interrupted: " + taskNumber);
Thread.currentThread().interrupt();
return null;
}
System.out.println("Ending task " + taskNumber);
return Math.random();
}
}
您可以使用 ExecutorSerive.invokeAll(List<Callable<T>> tasks, long timeout, TimeUnit timeUnit)
(https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#invokeAll-java.util.Collection-)。看看下面的代码示例:
package com.github.wololock;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
final class ExecutorsServiceInvokeAnyExample {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService executor = Executors.newFixedThreadPool(5);
final List<Callable<String>> tasks = Arrays.asList(
() -> {
debug("This task runs for 1 second");
Thread.sleep(1000);
debug("Task completed!");
return "1";
},
() -> {
debug("This task runs for 2 seconds");
Thread.sleep(2000);
debug("Task completed!");
return "2";
},
() -> {
debug("This task runs for 3 seconds");
Thread.sleep(2999);
debug("Task completed!");
return "3";
},
() -> {
debug("This task runs for 4 seconds");
Thread.sleep(4000);
debug("Task completed!");
return "4";
},
() -> {
debug("This task runs for 5 seconds");
Thread.sleep(5000);
debug("Task completed!");
return "5";
}
);
try {
final List<Future<String>> result = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
if (result.stream().anyMatch(Future::isCancelled)) {
throw new RuntimeException("All tasks were not completed...");
}
} finally {
executor.shutdown();
}
}
private static void debug(String msg) {
System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
}
}
我们正在触发 invokeAll
5 个任务,其中最快的任务需要 1 秒才能完成,最慢的任务需要 5 秒才能完成。调用超时设置为 3 秒,只有 3 个任务会在该时间内完成。在此示例中,如果未完成所有任务,我会抛出一个 RuntimeException
- 这取决于您的业务案例,如果发生这种情况,您将采取什么行动。下面是 运行 这个例子的示例输出:
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
Exception in thread "main" java.lang.RuntimeException: All tasks were not completed...
如果我给了 6 秒的超时时间,那么一切都会正确完成并且不会抛出异常:
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
[pool-1-thread-4] Task completed!
[pool-1-thread-5] Task completed!
Process finished with exit code 0
编辑:任务超时!=数据库服务器超时
还有一件事你应该仔细考虑。正如您在问题中提到的,您的任务将执行 MySQL 查询。请记住,如果您的任务终止,这并不意味着查询执行已停止 - 它仅意味着服务器在这 5-6 秒内没有响应,但很可能仍在执行查询。在这种情况下,您可能会 运行 错误地假设任务未完成,但它花费了更多时间,但最终 MySQL 服务器查询已执行,但没有结果返回给您的任务。另一件事是,在这种情况下,您失去了对将事务提交到数据库的控制权,这在您的情况下可能至关重要。我希望它能帮助您更好地理解什么是解决您问题的最佳方法。祝你好运!
您好,我遇到了关于分配给线程池的任务超时的问题 java。
详细:
- 我已经实现了 API,其中 运行 并行查询,return 响应。
- 有一个固定的线程池是通过
Executors.newFixedThreadPool(40)
创建的。每当有人调用此 API 时,一组 10 个任务就会在此线程池上进行调度。这些任务在内部对 Mysql 执行一组查询。 - 我必须 return 以 6-7 秒的 SLA 响应 API。所以我必须为线程池上安排的所有 10 个任务创建一个超时。我知道如何使单个任务超时(它抛出一个中断的异常,尽管线程只有在它启动后完成时才会变得空闲)。
- 我也不想超载线程池。因此,我通过分配给该线程池的任务为 运行 的所有查询创建了 60 秒的超时。因此,线程可以在一分钟内自由地执行另一项任务。
问题:
有什么优雅的方法可以解决这个问题吗?
@Component
public class MyHandler {
@PostConstruct
public void init() {
/*
* Naming thread pool to identify threads in the thread dump
* */
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("my-thread-%d").build();
executorService = Executors.newFixedThreadPool("40", threadFactory);
}
@PreDestroy
public void destroy() {
executorService.shutdown();
}
public void update() {
List<Future<Boolean>> results = new ArrayList<>();
results.add(executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
executeQuery();
return true;
}
}));
/*
* 9 more such tasks
*/
for (Future<Boolean> result : results) {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Failed with unknown error", e);
}
}
}
}
executeQuery()
安排了 60 秒的超时。
您可以使用 ExecutorService.invokeAll()
到 运行 具有超时的任务集合。方法完成后(完成工作或超时),您将必须检查所有期货以查看它们是否被取消(由于超时)或完成。如果它们已完成,您将必须检查它们是否已完成,因为工作已完成,而不是因为异常(当您调用 Future.get
时)。
代码可能如下所示:
final ExecutorService service = Executors.newCachedThreadPool();
final List<Future<Double>> futures = service.invokeAll(tasks, 2, TimeUnit.SECONDS);
final List<CallableTask> tasks = Arrays.asList(new CallableTask(1, TimeUnit.SECONDS),
new CallableTask(1, TimeUnit.HOURS), new CallableTask(100, TimeUnit.MILLISECONDS),
new CallableTask(50, TimeUnit.SECONDS));
for (Future<Double> result : futures) {
if (!result.isCancelled()) {
try {
System.out.println("Result: " + result.get());
} catch (ExecutionException e) {
// Task wasn't completed because of exception, may be required to handle this case
}
}
}
在我的例子中,CallableTask 是一个 Callable 实现,它用于使代码更简单,因为提交的所有任务都是相同的。您可以使用相同的方法来简化您的代码。
我添加了 CallableTask 的示例:
public class CallableTask implements Callable<Double> {
private static AtomicInteger count = new AtomicInteger(0);
private final int timeout;
private final TimeUnit timeUnit;
private final int taskNumber = count.incrementAndGet();
public CallableTask(int timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
}
@Override
public Double call() {
System.out.println("Starting task " + taskNumber);
try {
timeUnit.sleep(timeout);
} catch (InterruptedException e) {
System.out.println("Task interrupted: " + taskNumber);
Thread.currentThread().interrupt();
return null;
}
System.out.println("Ending task " + taskNumber);
return Math.random();
}
}
您可以使用 ExecutorSerive.invokeAll(List<Callable<T>> tasks, long timeout, TimeUnit timeUnit)
(https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#invokeAll-java.util.Collection-)。看看下面的代码示例:
package com.github.wololock;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
final class ExecutorsServiceInvokeAnyExample {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService executor = Executors.newFixedThreadPool(5);
final List<Callable<String>> tasks = Arrays.asList(
() -> {
debug("This task runs for 1 second");
Thread.sleep(1000);
debug("Task completed!");
return "1";
},
() -> {
debug("This task runs for 2 seconds");
Thread.sleep(2000);
debug("Task completed!");
return "2";
},
() -> {
debug("This task runs for 3 seconds");
Thread.sleep(2999);
debug("Task completed!");
return "3";
},
() -> {
debug("This task runs for 4 seconds");
Thread.sleep(4000);
debug("Task completed!");
return "4";
},
() -> {
debug("This task runs for 5 seconds");
Thread.sleep(5000);
debug("Task completed!");
return "5";
}
);
try {
final List<Future<String>> result = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
if (result.stream().anyMatch(Future::isCancelled)) {
throw new RuntimeException("All tasks were not completed...");
}
} finally {
executor.shutdown();
}
}
private static void debug(String msg) {
System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
}
}
我们正在触发 invokeAll
5 个任务,其中最快的任务需要 1 秒才能完成,最慢的任务需要 5 秒才能完成。调用超时设置为 3 秒,只有 3 个任务会在该时间内完成。在此示例中,如果未完成所有任务,我会抛出一个 RuntimeException
- 这取决于您的业务案例,如果发生这种情况,您将采取什么行动。下面是 运行 这个例子的示例输出:
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
Exception in thread "main" java.lang.RuntimeException: All tasks were not completed...
如果我给了 6 秒的超时时间,那么一切都会正确完成并且不会抛出异常:
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
[pool-1-thread-4] Task completed!
[pool-1-thread-5] Task completed!
Process finished with exit code 0
编辑:任务超时!=数据库服务器超时
还有一件事你应该仔细考虑。正如您在问题中提到的,您的任务将执行 MySQL 查询。请记住,如果您的任务终止,这并不意味着查询执行已停止 - 它仅意味着服务器在这 5-6 秒内没有响应,但很可能仍在执行查询。在这种情况下,您可能会 运行 错误地假设任务未完成,但它花费了更多时间,但最终 MySQL 服务器查询已执行,但没有结果返回给您的任务。另一件事是,在这种情况下,您失去了对将事务提交到数据库的控制权,这在您的情况下可能至关重要。我希望它能帮助您更好地理解什么是解决您问题的最佳方法。祝你好运!