如何实现带回调的异步调用?

How to implement asynchronous call with callback?

我需要创建一个库,其中包含同步和异步方法。

我的库的核心逻辑 -

客户将使用我们的库,他们将通过传递 DataKey 构建器对象来调用它。然后,我们将通过使用该 DataKey 对象构造一个 URL 并通过执行它对 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse 对象将 JSON 字符串原样发送回我们的客户。

我会有同步方法和异步方法。有些客户会调用 executeSynchronous 方法来获得相同的功能,有些客户会调用我们的 executeAsynchronous 方法,使用 executeAsynchronous 方法,他们会在代码本身中调用 future.get .

下面是我的界面-

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey dataKey);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey dataKey);
}

下面是我的DataResponseclass-

public class DataResponse {

    private String response;
    private DataErrorEnum error;
    private DataStatusEnum status;

    // constructor here

    // and getters here
}

下面是我的DataStatusEnumclass-

public enum DataStatusEnum {
    SUCCESS, ERROR;
}

下面是我的DataErrorEnumclass-

public enum DataErrorEnum {
    NONE(200, "NONE", "Response is success."),
    SERVER_DOWN(3145, "Server Down", "some long message here which can give more details."),
    CLIENT_ERROR(3123, "Client Error", "some long message here which can give more details."),
    TIMEOUT_ON_CLIENT(3187, "Client Timeout", "some long message here which can give more details.");

    private final int code;
    private final String status;
    private final String description;

    // constructor and getters here
}

然后我的 DataClient 实现了上面的 Client 接口。

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService service = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey dataKey) {
        DataResponse dataResponse = null;

        try {
            Future<String> future = executeAsynchronous(dataKey);
            dataResponse = future.get(dataKey.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, dataKey);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, dataKey);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey dataKey) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(dataKey, restTemplate);
            future = executor.submit(task);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, dataKey);
        }

        return future;
    }
}

下面是我的简单 class,它将执行实际任务 -

public class Task implements Callable<DataResponse> {

    private DataKey dataKey;
    private RestTemplate restTemplate;

    public Task(DataKey dataKey, RestTemplate restTemplate) {
        this.dataKey = dataKey;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, dataKey);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, dataKey);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using dataKey object
    private String createURL() {
        String url = somecode;

        return url;
    }
}

问题陈述:-

正如我上面提到的,一些客户会调用 executeSynchronous 方法来获取他们在 DataKey 对象中传递的用户 ID 的数据,而一些客户会调用 executeAsynchronous 方法使用 DataKey 对象,但在后一种情况下,他们将在其代码库中执行 future.get

如果你看到我的 executeSynchronous 方法,我正在调用 executeAsynchronous 方法后执行 future.get,如果有任何 TimeoutException,那么我正在使用 PotoLogging class 这在我们公司是特定的,日志将转到这里的其他一些服务,我们用它来查看仪表板上的所有错误日志。这主要取决于我们如何用什么名称记录它,以便我们可以在仪表板中看到这些名称。

现在的问题是我们公司的客户也可以调用 executeAsynchronous 方法,但这意味着,他们将在他们的代码库中执行 future.get,这也可能导致 TimeoutException他们的代码,但我不能强迫他们以与我相同的方式登录。所以我的问题是 - 如果有 TimeoutException 有任何方法我可以获得回调,这样如果有人在他们的代码库中调用我的库的 executeAsynchronous 方法我可以这样记录它 -

PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, dataKey);

我需要这样做,这样我的图书馆才能以我们想要的方式在我们公司拥有的工具中登录 TimeoutException。 Otherwsie,我需要告诉每个客户像这样记录它,以便我们可以在我们的仪表板中看到它。如何从异步调用中获取回调并仍然利用异步的所有功能?

最好的方法是什么?

Future只是一个接口。提供一个包装由您的服务返回的实例的实现。让它将所有调用委托给实际的 Future 并用适当的 try-catch 块包装这些调用。

Future<DataResponse> wrapper = new Future<DataResponse>() {
    private final Future<DataResponse> delegate = future;

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return delegate.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return delegate.isCancelled();
    }

    @Override
    public boolean isDone() {
        return delegate.isDone();
    }

    @Override
    public DataResponse get() throws InterruptedException, ExecutionException {
        DataResponse dataResponse = null;
        try {
            delegate.get();
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, dataKey);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
        }
        return dataResponse;
    }

    @Override
    public DataResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        DataResponse dataResponse = null;
        try {
            delegate.get(timeout, unit);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, dataKey);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
        }
        return dataResponse;
    }
};
return wrapper;