从 returns 未来的服务创建 Mono/Flux 的正确方法

Proper way to create a Mono/Flux from a service that returns a future

如何正确处理从 futures 构建的 Monos?

我正在努力了解 Spring Reactive(和 Spring 5),观看所有视频并阅读我能找到的所有博客,但它们似乎都停了下来做一些比查询数据库或其他微不足道的事情稍微多一点的事情。

我正在使用新的 AWS 2.0 SDK,它在大多数情况下都使用 CompletableFuture。使用服务创建新实例,我的方法如下所示

public Mono<RunInstancesResponse> create(Instance instance) {
    RunInstancesRequest runInstancesRequest = RunInstancesRequest.builder()
            .instanceType(instance.getInstanceType())
            .imageId(instance.getImageId())
            .securityGroupIds(instance.getSecurityGroupIds())
            .keyName(instance.getKeyName())
            .minCount(1)
            .maxCount(1)
            .tagSpecifications(createTags(instance))
            .build();

    CompletableFuture<RunInstancesResponse> future = client.runInstances(runInstancesRequest);

    future.whenComplete((response, error) -> {
        response.reservation().instances().stream().map(aws -> Instance.builder()
                .imageId(aws.imageId())
                .build()
        ).forEach(instanceRepository::save);

    });

    return Mono.fromFuture(future);
}

我的理解是我几乎立即返回 RunInstancesResponse 类型的 Mono,而 future.whenComplete 会在任何时候返回它。

我从我的路由处理程序中调用它,看起来像

public Mono<ServerResponse> create(ServerRequest request) {
    return request.bodyToMono(Instance.class)
            .flatMap(createService::create)
            .flatMap(i -> ServerResponse.accepted().build());
}

现在这几乎可以按我的预期工作,但是有几个关键问题我不知道如何解决。

1.) whenComplete 从未被调用,我相信这是因为我没有订阅它。

2.) 服务器直到 whenComplete 完成(大约 2.5 秒)才响应客户端,这并不理想,因为我希望它立即响应然后更新客户端whenComplete 被调用。

我感觉我的整个服务和处理程序都是完全错误的做事方式。

我喜欢一些示例,说明我应该如何处理从具有 MonoFlux 类型的路由处理程序调用的服务中的期货。

我写了一个 open source library 来包装 SQS、SNS 和 DynamoDb,使这更容易一些。为避免只有 link 的答案,您可以对其应用以下内容:

public Mono<ChangeMessageVisibilityResult> changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
    return Mono.create(subscriber -> amazonClient.changeMessageVisibilityAsync(queueUrl, receiptHandle, visibilityTimeout, AmazonWebServiceRequestAsyncHandler.valueEmittingHandlerFor(subscriber)));
  }

传递的处理程序在两个世界之间转换:

public class AmazonWebServiceRequestAsyncHandler<RQ extends AmazonWebServiceRequest, RS> implements AsyncHandler<RQ, RS> {
    private final MonoSink<? super RS> subscriber;
    private boolean shouldEmitValue;

    private AmazonWebServiceRequestAsyncHandler(MonoSink<? super RS> subscriber, boolean shouldEmitValue) {
        this.subscriber = subscriber;
        this.shouldEmitValue = shouldEmitValue;
    }

    @Override
    public void onError(Exception exception) {
        subscriber.error(exception);
    }

    @Override
    public void onSuccess(RQ request, RS response) {
        if (shouldEmitValue) {
            subscriber.success(response);
        } else {
            subscriber.success();
        }
    }

    public static <RQ extends AmazonWebServiceRequest, RS> AsyncHandler<RQ, RS> valueEmittingHandlerFor(final MonoSink<? super RS> subscriber) {
        return new AmazonWebServiceRequestAsyncHandler<>(subscriber, true);
    }

    public static <RQ extends AmazonWebServiceRequest> AsyncHandler<RQ, Void> voidHandlerFor(MonoSink<? super Void> subscriber) {
        return new AmazonWebServiceRequestAsyncHandler<>(subscriber, false);
    }
}