为分配给线程池的一组任务创建超时 java

Create a timeout for a set of tasks assigned to a threadpool java

您好,我遇到了关于分配给线程池的任务超时的问题 java。

详细:

  1. 我已经实现了 API,其中 运行 并行查询,return 响应。
  2. 有一个固定的线程池是通过Executors.newFixedThreadPool(40)创建的。每当有人调用此 API 时,一组 10 个任务就会在此线程池上进行调度。这些任务在内部对 Mysql 执行一组查询。
  3. 我必须 return 以 6-7 秒的 SLA 响应 API。所以我必须为线程池上安排的所有 10 个任务创建一个超时。我知道如何使单个任务超时(它抛出一个中断的异常,尽管线程只有在它启动后完成时才会变得空闲)。
  4. 我也不想超载线程池。因此,我通过分配给该线程池的任务为 运行 的所有查询创建了 60 秒的超时。因此,线程可以在一分钟内自由地执行另一项任务。

问题:

有什么优雅的方法可以解决这个问题吗?

@Component
public class MyHandler {
    @PostConstruct
    public void init() {

        /*
        * Naming thread pool to identify threads in the thread dump
        * */

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("my-thread-%d").build();

        executorService = Executors.newFixedThreadPool("40", threadFactory);
    }

    @PreDestroy
    public void destroy() {
        executorService.shutdown();
    }

    public void update() {
        List<Future<Boolean>> results = new ArrayList<>();
        results.add(executorService.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                executeQuery();
                return true;
            }
        }));

        /*
        * 9 more such tasks
        */


         for (Future<Boolean> result : results) {
            try {
                result.get();
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.error("Failed with unknown error", e);
            }
        }
    }
}

executeQuery() 安排了 60 秒的超时。

您可以使用 ExecutorService.invokeAll() 到 运行 具有超时的任务集合。方法完成后(完成工作或超时),您将必须检查所有期货以查看它们是否被取消(由于超时)或完成。如果它们已完成,您将必须检查它们是否已完成,因为工作已完成,而不是因为异常(当您调用 Future.get 时)。

代码可能如下所示:

    final ExecutorService service = Executors.newCachedThreadPool();
    final List<Future<Double>> futures = service.invokeAll(tasks, 2, TimeUnit.SECONDS);
    final List<CallableTask> tasks = Arrays.asList(new CallableTask(1, TimeUnit.SECONDS),
            new CallableTask(1, TimeUnit.HOURS), new CallableTask(100, TimeUnit.MILLISECONDS),
            new CallableTask(50, TimeUnit.SECONDS));

    for (Future<Double> result : futures) {
        if (!result.isCancelled())  {
            try {
                System.out.println("Result: " + result.get());
            } catch (ExecutionException e) {
                // Task wasn't completed because of exception, may be required to handle this case
            }
        }
    }

在我的例子中,CallableTask 是一个 Callable 实现,它用于使代码更简单,因为提交的所有任务都是相同的。您可以使用相同的方法来简化您的代码。

我添加了 CallableTask 的示例:

    public class CallableTask implements Callable<Double> {

    private static AtomicInteger count = new AtomicInteger(0);
    private final int timeout;
    private final TimeUnit timeUnit;
    private final int taskNumber = count.incrementAndGet();

    public CallableTask(int timeout, TimeUnit timeUnit) {
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }

    @Override
    public Double call() {
        System.out.println("Starting task " + taskNumber);
        try {
            timeUnit.sleep(timeout);
        } catch (InterruptedException e) {
            System.out.println("Task interrupted: " + taskNumber);
            Thread.currentThread().interrupt();
            return null;
        }
        System.out.println("Ending task " + taskNumber);
        return Math.random();
    }
}

您可以使用 ExecutorSerive.invokeAll(List<Callable<T>> tasks, long timeout, TimeUnit timeUnit) (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#invokeAll-java.util.Collection-)。看看下面的代码示例:

package com.github.wololock;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class ExecutorsServiceInvokeAnyExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        final ExecutorService executor = Executors.newFixedThreadPool(5);

        final List<Callable<String>> tasks = Arrays.asList(
                () -> {
                    debug("This task runs for 1 second");
                    Thread.sleep(1000);
                    debug("Task completed!");
                    return "1";
                },
                () -> {
                    debug("This task runs for 2 seconds");
                    Thread.sleep(2000);
                    debug("Task completed!");
                    return "2";
                },
                () -> {
                    debug("This task runs for 3 seconds");
                    Thread.sleep(2999);
                    debug("Task completed!");
                    return "3";
                },
                () -> {
                    debug("This task runs for 4 seconds");
                    Thread.sleep(4000);
                    debug("Task completed!");
                    return "4";
                },
                () -> {
                    debug("This task runs for 5 seconds");
                    Thread.sleep(5000);
                    debug("Task completed!");
                    return "5";
                }
        );

        try {
            final List<Future<String>> result = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
            if (result.stream().anyMatch(Future::isCancelled)) {
                throw new RuntimeException("All tasks were not completed...");
            }
        } finally {
            executor.shutdown();
        }
    }

    private static void debug(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }
}

我们正在触发 invokeAll 5 个任务,其中最快的任务需要 1 秒才能完成,最慢的任务需要 5 秒才能完成。调用超时设置为 3 秒,只有 3 个任务会在该时间内完成。在此示例中,如果未完成所有任务,我会抛出一个 RuntimeException - 这取决于您的业务案例,如果发生这种情况,您将采取什么行动。下面是 运行 这个例子的示例输出:

[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
Exception in thread "main" java.lang.RuntimeException: All tasks were not completed...

如果我给了 6 秒的超时时间,那么一切都会正确完成并且不会抛出异常:

[pool-1-thread-1] This task runs for 1 second
[pool-1-thread-5] This task runs for 5 seconds
[pool-1-thread-4] This task runs for 4 seconds
[pool-1-thread-2] This task runs for 2 seconds
[pool-1-thread-3] This task runs for 3 seconds
[pool-1-thread-1] Task completed!
[pool-1-thread-2] Task completed!
[pool-1-thread-3] Task completed!
[pool-1-thread-4] Task completed!
[pool-1-thread-5] Task completed!

Process finished with exit code 0

编辑:任务超时!=数据库服务器超时

还有一件事你应该仔细考虑。正如您在问题中提到的,您的任务将执行 MySQL 查询。请记住,如果您的任务终止,这并不意味着查询执行已停止 - 它仅意味着服务器在这 5-6 秒内没有响应,但很可能仍在执行查询。在这种情况下,您可能会 运行 错误地假设任务未完成,但它花费了更多时间,但最终 MySQL 服务器查询已执行,但没有结果返回给您的任务。另一件事是,在这种情况下,您失去了对将事务提交到数据库的控制权,这在您的情况下可能至关重要。我希望它能帮助您更好地理解什么是解决您问题的最佳方法。祝你好运!