如何在 Apache 异步客户端中配置允许的挂起请求数

How to configure the number of allowed pending requests in Apache async client

我正在使用 Apache httpasyncclient 4.0.2 版开发 HTTP 客户端应用程序。

我想配置最大待处理请求数。最初我假设这个数字与最大连接数相同。我通过以下方式将其设置为 20:

    final CloseableHttpAsyncClient httpclient;

在构造函数中:

    final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory = new ManagedNHttpClientConnectionFactory(new DefaultHttpRequestWriterFactory(), new DefaultHttpResponseParserFactory(), HeapByteBufferAllocator.INSTANCE);
    final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
            .setIoThreadCount(4)
            .setConnectTimeout(30000)
            .setSoTimeout(30000)
            .build();
    final PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(ioReactorConfig), connFactory);
    final int maxConns = 20;
    connManager.setDefaultMaxPerRoute(maxConns);
    connManager.setMaxTotal(maxConns);
    httpclient = HttpAsyncClientBuilder.create().setConnectionManager(connManager).build();
    httpclient.start();

及以后:

    final BasicAsyncRequestProducer requestProducer = new BasicAsyncRequestProducer(URIUtils.extractHost(URI.create(serverAddress)), request) {
        @Override
        public void requestCompleted(HttpContext context) {
            pendings.add(callback);
            logMessage(Direction.REQUEST, req);
            handler.onContentWriteCompleted();
        }
    };
    httpclient.execute(requestProducer, HttpAsyncMethods.createConsumer(), new HttpClientContext(), callback);

回调是我处理响应的地方。

作为概念证明,它失败了。确实搞定了四个线程运行,但是当我尝试同时发送20条消息时,立即只发送了8条,其余的必须等到服务器响应它们。

Apache 调试消息表明确实已创建 20 个连接。看来还要多做一些配置。

?

Apache HttpAsyncClient 维护一个无限制的请求执行队列,并且不会尝试限制待处理请求的数量。各种应用程序可能想要也可能不想限制请求速率,并且没有简单的方法来满足它们。

然而,可以使用一个简单的信号量相当容易地限制并发请求的数量。

final Semaphore semaphore = new Semaphore(maxConcurrencyLevel);
for (int i = 0; i < n; i++) {

    semaphore.acquire();
    this.httpclient.execute(
            new BasicAsyncRequestProducer(target, request),
            new MyResponseConsumer(),
            new FutureCallback<HttpResponse>() {

                @Override
                public void completed(final HttpResponse result) {
                    semaphore.release();
                }

                @Override
                public void failed(final Exception ex) {
                    semaphore.release();
                }

                @Override
                public void cancelled() {
                    semaphore.release();
                }

            });
}