在 Scala 中使用 AWS Java SDK V2 时处理异步调用的更好方法?
A better way to handle asynchronous calls when using the AWS Java SDK V2 in Scala?
背景
日前,AWSJavaSDK2.1
版本正式发布。主要卖点之一是它与以前版本的 SDK 相比如何处理异步调用。
我决定 运行 使用 Scala 和新的 SDK 进行一些实验,并且在尝试想出一种惯用的方法来处理 SDK 返回的 Futures 时遇到了一些困难。
问题
有什么方法可以用更少的样板代码更好、更简洁地完成这项工作?
Objective
使用 Scala 处理 Java V2 的 AWS SDK,并能够以惯用的方式处理成功和失败。
实验
创建异步 SNS 客户端并异步提交消息 500:
实验 1 - 使用由 SDK
返回的 CompletableFuture
(0 until 500).map { i =>
val future = client.publish(PublishRequest.builder().topicArn(arn).message(messageJava + i.toString).build())
future.whenComplete((response, ex) => {
val responseOption = Option(response) // Response can be null
responseOption match {
case Some(r) => println(r.messageId())
case None => println(s"There was an error ${ex.getMessage}")
}
})
}.foreach(future => future.join())
在这里,我创建了一个独特的请求并将其发布。 whenComplete
函数将响应转换为一个选项,因为该值可以为空。这很丑陋,因为处理 success/failure 的方法绑定在检查响应中的 null 上。
实验 2 - 在 Scala Future 中获取结果
(0 until 500).map { i =>
val jf = client.publish(PublishRequest.builder().topicArn(arn).message(messageScala + i.toString).build())
val sf: Future[PublishResponse] = Future { jf.get }
sf.onComplete {
case Success(response) => print(response.messageId)
case Failure(ex) => println(s"There was an error ${ex.getMessage}")
}
sf
}.foreach(Await.result(_, 5000.millis))
在这里,我在 CompletableFuture
上使用 .get()
方法,这样我就可以处理 Scala Future。
实验 3 - 使用 Scala - Java8 - Compat
库将 CompletableFuture
转换为 Future
(0 until 500).map { i =>
val f = client.publish(PublishRequest.builder().topicArn(arn).message(messageScala + i.toString).build()).toScala
f.onComplete {
case Success(response) =>
case Failure(exception) => println(exception.getMessage)
}
f
}.foreach(Await.result(_, 5000.millis))
这是迄今为止我最喜欢的实现,除了我需要使用第三方 experimental library。
观察
- 总的来说,所有这些实现的性能大致相同,
future.join()
比其他实现稍快一点。
- 这些函数初始化客户端和发布 500 条消息所花费的时间约为 2 秒
- 此代码的顺序版本用时不到 1 分钟(55 秒)
- 可以看到完整代码here
您提到您很高兴将 completablefuture 转换为 scala.future,只是您不喜欢依赖 scala-java8-compat。
在这种情况下,您可以简单地自己滚动,并且只希望 java8 scala:
object CompletableFutureOps {
implicit class CompletableFutureToScala[T](cf: CompletableFuture[T]) {
def asScala: Future[T] = {
val p = Promise[T]()
cf.whenCompleteAsync{ (result, ex) =>
if (result == null) p failure ex
else p success result
}
p.future
}
}
}
def showByExample: Unit = {
import CompletableFutureOps._
(0 until 500).map { i =>
val f = CompletableFuture.supplyAsync(() => i).asScala
f.onComplete {
case Success(response) => println("Success: " + response)
case Failure(exception) => println(exception.getMessage)
}
f
}.foreach(Await.result(_, 5000.millis))
}
背景
日前,AWSJavaSDK2.1
版本正式发布。主要卖点之一是它与以前版本的 SDK 相比如何处理异步调用。
我决定 运行 使用 Scala 和新的 SDK 进行一些实验,并且在尝试想出一种惯用的方法来处理 SDK 返回的 Futures 时遇到了一些困难。
问题
有什么方法可以用更少的样板代码更好、更简洁地完成这项工作?
Objective
使用 Scala 处理 Java V2 的 AWS SDK,并能够以惯用的方式处理成功和失败。
实验
创建异步 SNS 客户端并异步提交消息 500:
实验 1 - 使用由 SDK
返回的CompletableFuture
(0 until 500).map { i =>
val future = client.publish(PublishRequest.builder().topicArn(arn).message(messageJava + i.toString).build())
future.whenComplete((response, ex) => {
val responseOption = Option(response) // Response can be null
responseOption match {
case Some(r) => println(r.messageId())
case None => println(s"There was an error ${ex.getMessage}")
}
})
}.foreach(future => future.join())
在这里,我创建了一个独特的请求并将其发布。 whenComplete
函数将响应转换为一个选项,因为该值可以为空。这很丑陋,因为处理 success/failure 的方法绑定在检查响应中的 null 上。
实验 2 - 在 Scala Future 中获取结果
(0 until 500).map { i =>
val jf = client.publish(PublishRequest.builder().topicArn(arn).message(messageScala + i.toString).build())
val sf: Future[PublishResponse] = Future { jf.get }
sf.onComplete {
case Success(response) => print(response.messageId)
case Failure(ex) => println(s"There was an error ${ex.getMessage}")
}
sf
}.foreach(Await.result(_, 5000.millis))
在这里,我在 CompletableFuture
上使用 .get()
方法,这样我就可以处理 Scala Future。
实验 3 - 使用 Scala - Java8 - Compat
库将 CompletableFuture
转换为 Future
(0 until 500).map { i =>
val f = client.publish(PublishRequest.builder().topicArn(arn).message(messageScala + i.toString).build()).toScala
f.onComplete {
case Success(response) =>
case Failure(exception) => println(exception.getMessage)
}
f
}.foreach(Await.result(_, 5000.millis))
这是迄今为止我最喜欢的实现,除了我需要使用第三方 experimental library。
观察
- 总的来说,所有这些实现的性能大致相同,
future.join()
比其他实现稍快一点。 - 这些函数初始化客户端和发布 500 条消息所花费的时间约为 2 秒
- 此代码的顺序版本用时不到 1 分钟(55 秒)
- 可以看到完整代码here
您提到您很高兴将 completablefuture 转换为 scala.future,只是您不喜欢依赖 scala-java8-compat。
在这种情况下,您可以简单地自己滚动,并且只希望 java8 scala:
object CompletableFutureOps {
implicit class CompletableFutureToScala[T](cf: CompletableFuture[T]) {
def asScala: Future[T] = {
val p = Promise[T]()
cf.whenCompleteAsync{ (result, ex) =>
if (result == null) p failure ex
else p success result
}
p.future
}
}
}
def showByExample: Unit = {
import CompletableFutureOps._
(0 until 500).map { i =>
val f = CompletableFuture.supplyAsync(() => i).asScala
f.onComplete {
case Success(response) => println("Success: " + response)
case Failure(exception) => println(exception.getMessage)
}
f
}.foreach(Await.result(_, 5000.millis))
}