如何在多线程环境下更好的使用ExecutorService?

How to better use ExecutorService in multithreading environment?

我需要创建一个库,其中包含同步和异步方法。

我的库的核心逻辑

客户将使用我们的库,他们将通过传递 DataKey 构建器对象来调用它。然后,我们将通过使用该 DataKey 对象构造一个 URL 并通过执行它对该 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse 对象将 JSON 字符串原样发送回我们的客户。有些客户会调用 executeSynchronous(),有些可能会调用 executeAsynchronous(),所以这就是为什么我需要在我的库中分别提供两种方法。

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

然后我的 DataClient 实现了上面的 Client 接口:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // do I need to have all threads as non-daemon or I can have daemon thread for my use case?
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

简单 class 将执行实际任务:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) { // should I catch RuntimeException or just Exception here?
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

我对上述解决方案有几个问题 -

当我开始研究这个解决方案时,我并没有终止已经超时的任务。我是在给客户端报超时,但是任务一直在线程池中运行(可能会长期占用我有限的10个线程中的一个)。所以我在网上做了一些研究,我发现我可以通过使用 cancel on future 来取消那些已经超时的任务,如下所示 -

future.cancel(true);

但我想确定一下,我在 executeSynchronous 方法中取消超时任务的方式是否正确?

因为我在 Future 上调用 cancel() 如果任务仍在队列中,它将停止 运行ning,所以我不确定我在做什么是对的或不?执行此操作的正确方法是什么?

如果有更好的方法,谁能举个例子?

我们应该总是终止已经超时的任务吗?如果我们不这样做,那么我会产生什么影响?

Should I use daemon or non daemon threads for my above use case?

这取决于您是否希望这些线程阻止程序退出。当最后一个非守护线程完成时,JVM 将退出。

如果 JVM 存在,如果这些任务可以随时终止,那么它们应该是守护进程。如果你想让 JVM 等待它们,那么让它们成为非守护进程。

参见:java daemon thread and non-daemon thread

Also, I am terminating the tasks that have timed out so that it doesn't occupy one of my limited 10 threads for a long time. Does that look right the way I am doing it?

是也不是。您在 Future 上正确地调用了 cancel(),如果它仍在队列中,它将从 运行 停止它。但是,如果线程 已经 运行 任务,则取消只会中断线程。可能 restTemplate 调用是不可中断的,因此中断将被忽略。只有某些方法(如 Thread.sleep(...) 是可中断的并抛出 InterruptException。因此调用 future.cancel(true) 不会停止操作并终止线程。

参见:Thread not interrupting

你可以做的一件事是在你的 Task 对象上放置一个 cancel() 方法,强制关闭 restTemplate。您将需要对此进行试验。另一个想法是在 restTemplate 连接或 IO 上设置某种超时,这样它就不会永远等待。

如果您使用的是 Spring RestTemplate,则不会直接关闭,但您可以关闭底层连接,我相信这可能是通过 SimpleClientHttpRequestFactory 进行的,因此您将需要在基础 HttpURLConnection.

上调用 disconnect()

In my call() method, I am catching Exception. Should I catch RuntimeException there?

RuntimeException 延伸 Exception 所以你已经抓住了他们。

What is the difference if I catch Exception or RuntimeException?

捕获 Exception 捕获已检查(非运行时)异常 运行时异常。仅捕获 RuntimeException 意味着任何已定义的异常都不会被捕获,并且会被该方法抛出。

RuntimeException是特殊的异常,不需要代码检查。例如,任何代码都可以抛出 IllegalArgumentException 而无需将方法定义为 throws IllegalArgumentException。对于检查异常,如果检查异常未被调用方方法捕获或抛出,则为编译器错误,但对于 RuntimeExceptions.

则不是这样

以下是关于该主题的一个很好的回答:

  • Java: checked vs unchecked exception explanation

Should I use daemon or non daemon threads for my above use case?

视情况而定。但在这种情况下,我更喜欢守护线程,因为使用允许进程退出的客户端很方便。

Does that look right the way I am doing it?

不,不是。中断 IO 任务非常困难。也尝试在 RestTemplate 中设置超时。在这种情况下取​​消未来似乎没有意义。

What is the difference if I catch Exception or RuntimeException?

如果您在 try 块中没有检查异常,则没有区别 :) 只是因为在那种情况下只有 RuntimeExceptions 可能。

还有一个更重要的注意事项:将同步调用实现为异步 + 等待是个坏主意。它毫无意义,每次调用都会消耗线程池中的一个线程。只需创建任务实例并在当前线程中调用它!