Java / 由回调驱动的 Scala Future

Java / Scala Future driven by a callback

短版:

如何创建一个 Promise<Result> 在触发回调时完成的?

长版:

我正在开发一个处理第三方 SOAP 服务的应用程序。来自用户的请求同时委托给多个 SOAP 服务,汇总结果并发回给用户。

系统需要可扩展,并且应该允许多个并发用户。由于每个用户请求最终会触发大约 10 次 Web 服务调用,并且每次调用阻塞约 1 秒,因此系统需要设计为非阻塞 I/O.

我在这个系统的 Play Framework (Java) 中使用 Apache CXF。我已设法生成异步 WS 客户端代理并启用异步传输。我无法弄清楚的是,当我委托给多个 Web 服务代理并且结果将作为回调获得时,如何 return 播放线程的未来。

选项 1: 使用异步方法调用 returning Java 未来。

如本 scala.concurrent.Future wrapper for java.util.concurrent.Future 线程中所述,我们无法将 Java Future 转换为 Scala Future。从 Future 获得结果的唯一方法是执行 Future.get() 来阻止调用者。由于 CXF 生成的代理 return Java 未来,此选项被排除。

选项 2: 使用 Scala Future。

由于 CXF 生成了代理接口,我不确定是否有任何方法可以干预并且 return Scala Future(AFAIK Akka 使用 Scala Futures)而不是 Java Future?

选项 3: 使用回调方法。

CXF 生成的异步方法 return Java Future 也接受一个回调对象,我想它会在结果准备好时提供回调。要使用这种方法,我需要 return 一个 Future,它会等到我收到回调。

我认为 选项 3 最有希望,尽管我不知道如何 return 一个将在收到回调时完成的 Promise。我可能有一个线程在 while(true) 中等待,并在中间等待直到结果可用。同样,我不知道如何在不阻塞线程的情况下进入 wait

简而言之,我正在尝试构建一个进行大量 SOAP Web 服务调用的系统,其中每个调用都会阻塞很长时间。在大量并发 Web 服务调用的情况下,系统可能很容易 运行 线程不足。我正在努力寻找一种基于非阻塞 I/O 的解决方案,它可以同时允许许多正在进行的 Web 服务调用。

选项 3 看起来不错 :) 开始的几个导入...

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration

并且,为了说明这一点,这里有一个接受回调的模拟 CXF API:

def fetch(url: String, callback: String => Unit) = {
  callback(s"results for $url")
}

创建承诺,调用 API 并使用承诺作为回调:

val promise = Promise[String]
fetch("http://corp/api", result => promise.success(result))

然后您可以将 promise.future 这是 Future 的一个实例带入您的 Play 应用程序。

要测试它,您可以这样做:

Await.result(promise.future, Duration.Inf)

这将阻止等待结果,此时您应该在控制台中看到 "results for http://corp/api"。