非阻塞 Java 异步处理 - 如何限制内存使用?

Nonblocking Java Async Processing - how to constrain memory usage?

离开 Java 几年后,我很高兴看到新 java.net.http.HttpClient and in the AWS Java SDK 2.0 引入了 non-blocking 异步支持。我多年前在会议演讲中听说过响应式编程的概念,但没有太多机会在​​实践中应用这些想法。

我有一个问题似乎很适合使用这种编程风格:基本上我想通过 HTTP 下载一堆文件(比如 10,000 个)并将它们写回 S3。

我已经使用 failsafe 实现非阻塞异步 http GET 的重试,通过 S3 异步客户端将这些与上传组合起来很简单(见下图)。

但是,我不确定如何正确限制程序的内存使用:如果文件下载速度快于写回速度,则没有应用反压和防止 out-of-memory 异常的机制输出到 S3。

我熟悉针对此问题的一些传统阻塞解决方案 - 例如使用信号量来限制并发下载的数量,或者将下载写出到 S3 上传线程将从中拉出的某个有界阻塞队列。但是,如果我要使用这种阻塞机制来应用背压,那么它首先会让我质疑使用非阻塞 IO 的优势。

是否有更惯用的 "reactive" 方法来实现相同的目标?

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class BackupClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
    private final HttpClient httpClient = HttpClient.newBuilder().build();
    private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();

    public runBackup(List<URI> filesToBackup) {
        List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
                .map(backupClient::submitBackup)
                .collect(Collectors.toList());

        futures.forEach(CompletableFuture::join);
    }

    private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
        return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
                .thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
                        .bucket("my-bucket")
                        .key(uri.toASCIIString())
                        .build(), AsyncRequestBody.fromString(httpResponse.body())));
    }


    private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
        final HttpRequest request = HttpRequest.newBuilder()
                .uri(uri)
                .timeout(Duration.ofMinutes(2))
                .GET()
                .build();

        final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
                .withMaxRetries(4)
                .withDelay(Duration.ofSeconds(1))
                .handleResultIf(response -> 200 != response.statusCode());

        return Failsafe.with(retryPolicy)
                .getStageAsync(context -> {
                    if (context.getAttemptCount() > 0) {
                        LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
                    }
                    return this.httpClient.sendAsync(request, handler);
                });
    }
}

既然你需要控制资源(内存)的消耗,那么Semaphore是实现这个目标的合适工具。当你想使用 non-blocking 计算时,你所需要的只是异步信号量。流行的库(rxjava、反应流)在内部使用异步信号量来构造反应流,但不将其作为单独的 class 提供。但是,当反应流的订阅者调用 Flow.Subscription.request(n), it is equivalent to Semaphore.release(n). The analogue to Semaphore.acquire() 时,它是隐藏的。它由发布者在内部调用。

这种设计方案的缺点是只能在生产者和最近的消费者之间建立资源反馈。如果有生产者和消费者链条,那么每个link的资源消耗就得单独控制,整体资源消耗就变大N倍,N是link的个数。

如果你负担得起,那么你可以使用 rxjava 或任何其他反应流库的实现。如果没有,那么您必须使用唯一的异步库,它允许用户完全访问 asynchronous Semaphore implementation : DF4J(是的,我是作者)。它不包含对您的问题的直接解决方案,但是 有一个异步网络服务器通过异步信号量限制同时连接数的例子,见ConnectionManager.java.