将异步 API 调用与 Java 同步

Synchronizing asynchronous API calls with Java

考虑以下场景:

  1. 服务 A 调用服务 B 以完成任务。
  2. 服务 B returns "OK",并继续以异步执行任务。
  3. 由于 A 从 B 收到了 OK,因此它也 return 响应调用它的人。

我希望能够同步这个任务。服务 B 是可定制的,因此它可以在其应该执行的任务结束时进行 API 调用。我的想法是:

  1. 服务 A 调用服务 B 以完成任务。
  2. 服务 B returns "OK",并继续以异步执行任务。
  3. 服务 A 收到 B 的响应,但没有return。
  4. 服务 B 结束任务,并向 A 发送 API 调用(到某个端点等...)。
  5. 服务 A 然后接收 returns。

这只是一个例子。事实上,服务 A 是一个 Spring 启动应用程序,而服务 B 是我们在其上构建的第三方软件。

是否可以将异步 API 调用与 Java/Spring 同步?我试着在网上搜索这个,但找不到合适的东西。

因此,根据您的请求数据,我假设有一个从服务 A 发送到服务 B 的唯一 ID。

如果是这种情况,您可以将此 ID 用作关联 ID,并可以使用 CompletableFutures 实施等待策略。当服务 B 以 "OK" 响应时,服务 A 可以使用唯一 ID 作为键创建一个可完成的未来,并在此调用 get() ,它将阻塞直到调用 complete() 。当服务 B 完成其处理时,它可以调用服务 A 上的 API 结果以及相关 ID,现在可用于完成未来。

下面是一些基本代码,只是为了说明这个想法

public class ServiceA {

    private Map<String, CompletableFuture<String>> correlationStore;

    public ServiceA() {
        correlationStore = new HashMap<>();
    }

    public String performTask(String id, String data) {

        CompletableFuture<String> futureResult = new CompletableFuture<>();
        String submissionResult = callServiceB(id, data);
        if (!"OK".equalsIgnoreCase(submissionResult)) {
            return "FAILED";
        }
        //Service B has accepted the request and is busy processing the result
        correlationStore.put(id, futureResult);
        String response = null;
        try {
            //Set a timeout to whatever is sensible
            response = futureResult.get(30, TimeUnit.SECONDS); // Thread is now blocked until complete() is called
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e instanceof TimeoutException) {
                correlationStore.remove(id);
            }
            //Handle failure depending on your requirements
        }
        return response;
    }

    //This is called from API call back from Service B
    public void onResponse(String id, String responseData) {
        CompletableFuture<String> pendingResult = correlationStore.remove(id);
        if (pendingResult != null) {
            pendingResult.complete(responseData);
        } else {
            //Handle the scenario where there is not future waiting for a response
        }
    }
}

但是,使用这种方法时,有许多事情需要考虑,比如如果服务 B 从未实际回调到服务 A 并返回结果,或者更确切地说,如果您等待服务 B 超时,该怎么办响应,删除未来,然后响应在稍后阶段返回,你应该怎么做才能处理这个问题?

但这现在完全取决于您的服务 A 做了什么,以及您的具体案例是围绕失败的,即将请求的状态存储在服务 A 中,并提供查询状态的机制等。

然而,我强烈建议,根据您项目的灵活性,研究中间件队列机制 RabbitMQ/Kafka/Pulsar 等,因为它们都为基于工作队列的体系结构提供了强大的功能,并且可以提供有用的你看你的情况。