如何在 Apache 异步客户端中配置允许的挂起请求数
How to configure the number of allowed pending requests in Apache async client
我正在使用 Apache httpasyncclient 4.0.2 版开发 HTTP 客户端应用程序。
我想配置最大待处理请求数。最初我假设这个数字与最大连接数相同。我通过以下方式将其设置为 20:
final CloseableHttpAsyncClient httpclient;
在构造函数中:
final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory = new ManagedNHttpClientConnectionFactory(new DefaultHttpRequestWriterFactory(), new DefaultHttpResponseParserFactory(), HeapByteBufferAllocator.INSTANCE);
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(4)
.setConnectTimeout(30000)
.setSoTimeout(30000)
.build();
final PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(ioReactorConfig), connFactory);
final int maxConns = 20;
connManager.setDefaultMaxPerRoute(maxConns);
connManager.setMaxTotal(maxConns);
httpclient = HttpAsyncClientBuilder.create().setConnectionManager(connManager).build();
httpclient.start();
及以后:
final BasicAsyncRequestProducer requestProducer = new BasicAsyncRequestProducer(URIUtils.extractHost(URI.create(serverAddress)), request) {
@Override
public void requestCompleted(HttpContext context) {
pendings.add(callback);
logMessage(Direction.REQUEST, req);
handler.onContentWriteCompleted();
}
};
httpclient.execute(requestProducer, HttpAsyncMethods.createConsumer(), new HttpClientContext(), callback);
回调是我处理响应的地方。
作为概念证明,它失败了。确实搞定了四个线程运行,但是当我尝试同时发送20条消息时,立即只发送了8条,其余的必须等到服务器响应它们。
Apache 调试消息表明确实已创建 20 个连接。看来还要多做一些配置。
?
Apache HttpAsyncClient 维护一个无限制的请求执行队列,并且不会尝试限制待处理请求的数量。各种应用程序可能想要也可能不想限制请求速率,并且没有简单的方法来满足它们。
然而,可以使用一个简单的信号量相当容易地限制并发请求的数量。
final Semaphore semaphore = new Semaphore(maxConcurrencyLevel);
for (int i = 0; i < n; i++) {
semaphore.acquire();
this.httpclient.execute(
new BasicAsyncRequestProducer(target, request),
new MyResponseConsumer(),
new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse result) {
semaphore.release();
}
@Override
public void failed(final Exception ex) {
semaphore.release();
}
@Override
public void cancelled() {
semaphore.release();
}
});
}
我正在使用 Apache httpasyncclient 4.0.2 版开发 HTTP 客户端应用程序。
我想配置最大待处理请求数。最初我假设这个数字与最大连接数相同。我通过以下方式将其设置为 20:
final CloseableHttpAsyncClient httpclient;
在构造函数中:
final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory = new ManagedNHttpClientConnectionFactory(new DefaultHttpRequestWriterFactory(), new DefaultHttpResponseParserFactory(), HeapByteBufferAllocator.INSTANCE);
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(4)
.setConnectTimeout(30000)
.setSoTimeout(30000)
.build();
final PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(ioReactorConfig), connFactory);
final int maxConns = 20;
connManager.setDefaultMaxPerRoute(maxConns);
connManager.setMaxTotal(maxConns);
httpclient = HttpAsyncClientBuilder.create().setConnectionManager(connManager).build();
httpclient.start();
及以后:
final BasicAsyncRequestProducer requestProducer = new BasicAsyncRequestProducer(URIUtils.extractHost(URI.create(serverAddress)), request) {
@Override
public void requestCompleted(HttpContext context) {
pendings.add(callback);
logMessage(Direction.REQUEST, req);
handler.onContentWriteCompleted();
}
};
httpclient.execute(requestProducer, HttpAsyncMethods.createConsumer(), new HttpClientContext(), callback);
回调是我处理响应的地方。
作为概念证明,它失败了。确实搞定了四个线程运行,但是当我尝试同时发送20条消息时,立即只发送了8条,其余的必须等到服务器响应它们。
Apache 调试消息表明确实已创建 20 个连接。看来还要多做一些配置。
?
Apache HttpAsyncClient 维护一个无限制的请求执行队列,并且不会尝试限制待处理请求的数量。各种应用程序可能想要也可能不想限制请求速率,并且没有简单的方法来满足它们。
然而,可以使用一个简单的信号量相当容易地限制并发请求的数量。
final Semaphore semaphore = new Semaphore(maxConcurrencyLevel);
for (int i = 0; i < n; i++) {
semaphore.acquire();
this.httpclient.execute(
new BasicAsyncRequestProducer(target, request),
new MyResponseConsumer(),
new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse result) {
semaphore.release();
}
@Override
public void failed(final Exception ex) {
semaphore.release();
}
@Override
public void cancelled() {
semaphore.release();
}
});
}