Apache 的 HttpAsyncClient 对多个请求不可靠

Apache's HttpAsyncClient unreliable with multiple requests

我目前在一个项目中,我必须对一个 returns 一个 JSON 响应的休息服务执行多个并发的 http 请求。这是一个批量操作,任何时候的请求数都可能在几百到几千之间。

这就是为什么我认为拥有一个异步 http 客户端是个好主意,这样我就可以处理并发请求,这可以显着加快处理速度。我首先尝试了 ning 的 async-http-client。也许我做错了什么,因为这对我来说有点慢。 1000 个请求大约需要 10 秒。

之后我尝试了 Apache 的实现,它更快地处理 1000 个请求大约需要 4 秒。但我似乎无法获得稳定的请求。大多数时候我会得到一个包含 1000 个响应的列表(正如我所期望的那样),但有时我只是缺少一些响应,比如 1 或 2。

这是我目前的代码:

public class AsyncServiceTest {
    public AsyncServiceTest(String serviceURI) {
        this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
                .setDefaultRequestConfig(RequestConfig.custom().build()).build();

        this.objectMapper = new ObjectMapper();
        this.serviceURI = serviceURI;
    }

    private List<Object> getResults(List<String> queryStrings) throws Exception {
        try {
            httpClient.start();

            final List<HttpGet> requests = new ArrayList<>(addresses.size());
            for (String str : queryStrings) {
                requests.add(new HttpGet(buildUri(str))); // In this method we build the absolute request uri.
            }

            final CountDownLatch latch = new CountDownLatch(requests.size());
            final List<Object> responses = new ArrayList<>(requests.size());
            final List<String> stringResponses = new ArrayList<>(requests.size());

            for (final HttpGet request : requests) {
                httpClient.execute(request, new FutureCallback<HttpResponse>() {
                    @Override
                    public void completed(HttpResponse response) {
                        try {
                            stringResponses.add(IOUtils.toString(response.getEntity().getContent(), "UTF-8"));
                            latch.countDown();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Exception e) {
                        latch.countDown();
                    }

                    @Override
                    public void cancelled() {
                        latch.countDown();
                    }
                });
            }

            latch.await();

            for (String r : stringResponses) {
                responses.add(mapToLocation(r)); // Mapping some Strings to JSON in this method.
            }

            return responses;
        } finally {
            httpClient.close();
        }
    }
}

所以,从本质上讲,我想知道我的代码是否有问题(可能),或者仅仅是因为库的工作方式?因为 CountDownLatch 始终为零。或者有没有人有正确方向的指针(也许是另一个图书馆)?

这似乎是我代码中的并发问题(感谢@vanOekel)。答案是用 Vector<E> 替换 ArrayList<E>,这实际上是线程安全的。示例代码:

public class AsyncServiceTest {
    public AsyncServiceTest(String serviceURI) {
        this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
                .setDefaultRequestConfig(RequestConfig.custom().build()).build();

        this.objectMapper = new ObjectMapper();
        this.serviceURI = serviceURI;
    }

    private List<Object> getResults(List<String> queryStrings) throws Exception {
        try {
            httpClient.start();

            final CountDownLatch latch = new CountDownLatch(queryStrings.size());
            final Vector<Object> responses = new Vector<>(queryStrings.size());

            for (String str : queryStrings) {
                // buildUri: In this method we build the absolute request uri.
                httpClient.execute(new HttpGet(buildUri(str)), new FutureCallback<HttpResponse>() {
                    @Override
                    public void completed(HttpResponse response) {
                        try {
                            // mapToLocation: Mapping some Strings to JSON in this method.
                            responses.add(mapToLocation(IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
                            latch.countDown();
                        } catch (IOException e) {
                            failed(e);
                        }
                    }

                    @Override
                    public void failed(Exception e) {
                        logger.error(e.getLocalizedMessage(), e);
                        latch.countDown();
                    }

                    @Override
                    public void cancelled() {
                        logger.error("Request cancelled.");
                        latch.countDown();
                    }
                });
            }

            latch.await();
            return responses;
        } finally {
            httpClient.close();
        }
    }
}

感谢所有有用的回复。如果有人对上述代码的优化有任何建议,我将很高兴听到。