从必要的同步方法调用异步方法(Vert.x、Java)

Calling async methods (Vert.x, Java) from necessarily synchronous ones

我们有一组 Java 应用程序,这些应用程序最初是使用普通的同步方法编写的,但大部分都已转换为异步方法 Vert.x(常规的 API,而不是 Rx)说得通。我们在同步代码和异步代码之间的边界上遇到了一些麻烦,尤其是当我们有一个必须同步的方法(下面解释了原因)并且我们想从中调用一个异步方法时。

之前在 Stack Overflow 上提出了许多类似的问题,但实际上所有这些问题都在 C# 上下文中,答案似乎并不适用。

除其他外,我们还使用了 Geotools 和 Apache Shiro。两者都使用他们定义的严格同步的 API 通过扩展提供定制。作为一个具体的例子,我们为 Shiro 定制的授权领域需要访问我们的用户数据存储,为此我们创建了一个异步 DAO API。我们要写的 Shiro 方法叫做 doGetAuthorizationInfo; 它应该是 return 和 AuthorizationInfo。但是似乎没有可靠的方法从异步 DAO 的另一端访问授权数据 API.

在线程不是由 Vert.x 创建的特定情况下,使用 CompletableFuture 是一个可行的解决方案:同步 doGetAuthorizationInfo 会将异步工作推到 Vert.x 线程,然后阻塞 CompletableFuture.get() 中的当前线程,直到结果可用。

不幸的是,Shiro(或 Geotools,或其他)方法可能会在 Vert.x 线程上调用。在那种情况下,阻塞当前线程是非常糟糕的:如果它是事件循环线程,那么我们就违反了黄金法则,而如果它是一个工作线程(比如,通过 Vertx.executeBlocking),那么阻塞它会阻止工作人员不再从其队列中拾取任何东西 - 这意味着阻塞将是永久性的。

这个问题有"standard"解决方案吗?在我看来,它会在任何时候出现 Vert.x 在可扩展同步库下使用。这只是人们避免的情况吗?

编辑

...更详细一点。这是来自 org.apache.shiro.realm.AuthorizingRealm 的片段:

/**
 * Retrieves the AuthorizationInfo for the given principals from the underlying data store.  When returning
 * an instance from this method, you might want to consider using an instance of
 * {@link org.apache.shiro.authz.SimpleAuthorizationInfo SimpleAuthorizationInfo}, as it is suitable in most cases.
 *
 * @param principals the primary identifying principals of the AuthorizationInfo that should be retrieved.
 * @return the AuthorizationInfo associated with this principals.
 * @see org.apache.shiro.authz.SimpleAuthorizationInfo
 */
protected abstract AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals);

我们的数据访问层有这样的方法:

void loadUserAccount(String id, Handler<AsyncResult<UserAccount>> handler);

我们如何从前者调用后者?如果我们知道 doGetAuthorizationInfo 在非 Vert.x 线程中被调用,那么我们可以这样做:

@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
    CompletableFuture<UserAccount> completable = new CompletableFuture<>();
    vertx.<UserAccount>executeBlocking(vertxFuture -> {
        loadUserAccount((String) principals.getPrimaryPrincipal(), vertxFuture);
    }, res -> {
        if (res.failed()) {
            completable.completeExceptionally(res.cause());
        } else {
            completable.complete(res.result());
        }
    });

    // Block until the Vert.x worker thread provides its result.
    UserAccount ua = completable.get();

    // Build up authorization info from the user account
    return new SimpleAuthorizationInfo(/* etc...*/);
}

但是如果在 Vert.x 线程中调用 doGetAuthorizationInfo 那么事情就完全不同了。上面的技巧会阻塞一个事件循环线程,所以这是不行的。或者,如果它是一个工作线程,那么 executeBlocking 调用会将 loadUserAccount 任务放入同一个工作线程的队列中(我相信),因此随后的 completable.get() 将永久阻塞。

我敢打赌您已经知道答案了,但希望不是这样 -- 如果调用 GeoTools 或 Shiro 需要阻止等待来自某物的响应,那么您不应该进行该调用在 Vert.x 个话题上。

您应该创建一个 ExecutorService 和一个您应该用来执行这些调用的线程池,安排每个提交的任务在完成时发送一条 Vert.x 消息。

您可以灵活调整移入线程池的块的大小。您可以将更大的东西移到调用堆栈的更高位置,而不是将这些调用紧密包装起来。您可能会根据必须更改的代码量做出此决定。由于使方法异步通常意味着改变 all 其调用堆栈中的同步方法(这是这种异步模型的不幸的基本问题),您可能希望在高堆栈。

您最终可能会得到一个适配器层,它为各种同步服务提供 Vert.x API。