Apache 的 HttpAsyncClient 对多个请求不可靠
Apache's HttpAsyncClient unreliable with multiple requests
我目前在一个项目中,我必须对一个 returns 一个 JSON 响应的休息服务执行多个并发的 http 请求。这是一个批量操作,任何时候的请求数都可能在几百到几千之间。
这就是为什么我认为拥有一个异步 http 客户端是个好主意,这样我就可以处理并发请求,这可以显着加快处理速度。我首先尝试了 ning 的 async-http-client。也许我做错了什么,因为这对我来说有点慢。 1000 个请求大约需要 10 秒。
之后我尝试了 Apache 的实现,它更快地处理 1000 个请求大约需要 4 秒。但我似乎无法获得稳定的请求。大多数时候我会得到一个包含 1000 个响应的列表(正如我所期望的那样),但有时我只是缺少一些响应,比如 1 或 2。
这是我目前的代码:
public class AsyncServiceTest {
public AsyncServiceTest(String serviceURI) {
this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
.setDefaultRequestConfig(RequestConfig.custom().build()).build();
this.objectMapper = new ObjectMapper();
this.serviceURI = serviceURI;
}
private List<Object> getResults(List<String> queryStrings) throws Exception {
try {
httpClient.start();
final List<HttpGet> requests = new ArrayList<>(addresses.size());
for (String str : queryStrings) {
requests.add(new HttpGet(buildUri(str))); // In this method we build the absolute request uri.
}
final CountDownLatch latch = new CountDownLatch(requests.size());
final List<Object> responses = new ArrayList<>(requests.size());
final List<String> stringResponses = new ArrayList<>(requests.size());
for (final HttpGet request : requests) {
httpClient.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
stringResponses.add(IOUtils.toString(response.getEntity().getContent(), "UTF-8"));
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Exception e) {
latch.countDown();
}
@Override
public void cancelled() {
latch.countDown();
}
});
}
latch.await();
for (String r : stringResponses) {
responses.add(mapToLocation(r)); // Mapping some Strings to JSON in this method.
}
return responses;
} finally {
httpClient.close();
}
}
}
所以,从本质上讲,我想知道我的代码是否有问题(可能),或者仅仅是因为库的工作方式?因为 CountDownLatch 始终为零。或者有没有人有正确方向的指针(也许是另一个图书馆)?
这似乎是我代码中的并发问题(感谢@vanOekel)。答案是用 Vector<E>
替换 ArrayList<E>
,这实际上是线程安全的。示例代码:
public class AsyncServiceTest {
public AsyncServiceTest(String serviceURI) {
this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
.setDefaultRequestConfig(RequestConfig.custom().build()).build();
this.objectMapper = new ObjectMapper();
this.serviceURI = serviceURI;
}
private List<Object> getResults(List<String> queryStrings) throws Exception {
try {
httpClient.start();
final CountDownLatch latch = new CountDownLatch(queryStrings.size());
final Vector<Object> responses = new Vector<>(queryStrings.size());
for (String str : queryStrings) {
// buildUri: In this method we build the absolute request uri.
httpClient.execute(new HttpGet(buildUri(str)), new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
// mapToLocation: Mapping some Strings to JSON in this method.
responses.add(mapToLocation(IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
latch.countDown();
} catch (IOException e) {
failed(e);
}
}
@Override
public void failed(Exception e) {
logger.error(e.getLocalizedMessage(), e);
latch.countDown();
}
@Override
public void cancelled() {
logger.error("Request cancelled.");
latch.countDown();
}
});
}
latch.await();
return responses;
} finally {
httpClient.close();
}
}
}
感谢所有有用的回复。如果有人对上述代码的优化有任何建议,我将很高兴听到。
我目前在一个项目中,我必须对一个 returns 一个 JSON 响应的休息服务执行多个并发的 http 请求。这是一个批量操作,任何时候的请求数都可能在几百到几千之间。
这就是为什么我认为拥有一个异步 http 客户端是个好主意,这样我就可以处理并发请求,这可以显着加快处理速度。我首先尝试了 ning 的 async-http-client。也许我做错了什么,因为这对我来说有点慢。 1000 个请求大约需要 10 秒。
之后我尝试了 Apache 的实现,它更快地处理 1000 个请求大约需要 4 秒。但我似乎无法获得稳定的请求。大多数时候我会得到一个包含 1000 个响应的列表(正如我所期望的那样),但有时我只是缺少一些响应,比如 1 或 2。
这是我目前的代码:
public class AsyncServiceTest {
public AsyncServiceTest(String serviceURI) {
this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
.setDefaultRequestConfig(RequestConfig.custom().build()).build();
this.objectMapper = new ObjectMapper();
this.serviceURI = serviceURI;
}
private List<Object> getResults(List<String> queryStrings) throws Exception {
try {
httpClient.start();
final List<HttpGet> requests = new ArrayList<>(addresses.size());
for (String str : queryStrings) {
requests.add(new HttpGet(buildUri(str))); // In this method we build the absolute request uri.
}
final CountDownLatch latch = new CountDownLatch(requests.size());
final List<Object> responses = new ArrayList<>(requests.size());
final List<String> stringResponses = new ArrayList<>(requests.size());
for (final HttpGet request : requests) {
httpClient.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
stringResponses.add(IOUtils.toString(response.getEntity().getContent(), "UTF-8"));
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Exception e) {
latch.countDown();
}
@Override
public void cancelled() {
latch.countDown();
}
});
}
latch.await();
for (String r : stringResponses) {
responses.add(mapToLocation(r)); // Mapping some Strings to JSON in this method.
}
return responses;
} finally {
httpClient.close();
}
}
}
所以,从本质上讲,我想知道我的代码是否有问题(可能),或者仅仅是因为库的工作方式?因为 CountDownLatch 始终为零。或者有没有人有正确方向的指针(也许是另一个图书馆)?
这似乎是我代码中的并发问题(感谢@vanOekel)。答案是用 Vector<E>
替换 ArrayList<E>
,这实际上是线程安全的。示例代码:
public class AsyncServiceTest {
public AsyncServiceTest(String serviceURI) {
this.httpClient = HttpAsyncClients.custom().setMaxConnPerRoute(100).setMaxConnTotal(20)
.setDefaultRequestConfig(RequestConfig.custom().build()).build();
this.objectMapper = new ObjectMapper();
this.serviceURI = serviceURI;
}
private List<Object> getResults(List<String> queryStrings) throws Exception {
try {
httpClient.start();
final CountDownLatch latch = new CountDownLatch(queryStrings.size());
final Vector<Object> responses = new Vector<>(queryStrings.size());
for (String str : queryStrings) {
// buildUri: In this method we build the absolute request uri.
httpClient.execute(new HttpGet(buildUri(str)), new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
// mapToLocation: Mapping some Strings to JSON in this method.
responses.add(mapToLocation(IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
latch.countDown();
} catch (IOException e) {
failed(e);
}
}
@Override
public void failed(Exception e) {
logger.error(e.getLocalizedMessage(), e);
latch.countDown();
}
@Override
public void cancelled() {
logger.error("Request cancelled.");
latch.countDown();
}
});
}
latch.await();
return responses;
} finally {
httpClient.close();
}
}
}
感谢所有有用的回复。如果有人对上述代码的优化有任何建议,我将很高兴听到。