通过 Kotlin 协程并发 S3 文件上传
Concurrent S3 File Upload via Kotlin Coroutines
我需要将许多文件上传到 S3,按顺序完成该工作需要几个小时。这正是 Kotlin 的新协程所擅长的,所以我想先尝试一下,而不是再次摆弄一些基于线程的执行服务。
这是我的(简化)代码:
fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking {
val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build()
for ((x, ys) in superTiles) {
val jobs = mutableListOf<Deferred<Any>>()
for ((y, superTile) in ys) {
val job = async(CommonPool) {
uploadTile(s3, x, y, superTile)
}
jobs.add(job)
}
jobs.map { it.await() }
}
}
suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) {
val json: String = "{}"
val key = "$s3Prefix/x4/$z/$x/$y.json"
s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
}
问题:代码仍然很慢,日志显示请求仍然按顺序执行:一个作业在创建下一个作业之前完成。仅在极少数情况下(十分之 1),我同时看到作业 运行ning。
为什么代码没有 运行 更快/并发?我该怎么办?
Kotlin 协程 excel 当你使用 asynchronous API 时,而你使用的 AmazonS3.putObject
API 是一个老式阻塞,同步 API,因此您只能获得与您正在使用的 CommonPool
中的线程数一样多的并发上传。将 uploadTile
函数标记为已修改 suspend
没有任何价值,因为它没有在其主体中使用任何暂停函数。
提高上传任务吞吐量的第一步是开始使用异步 API。我建议查看那个钱包的 Amazon S3 TransferManager。看看是否首先解决了您的问题。
Kotlin 协程旨在帮助您将异步 API 组合成易于使用的逻辑工作流。例如,通过编写以下扩展函数,可以直接将 TransferManager
的异步 API 与协程一起使用:
suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont ->
addProgressListener {
if (isDone) {
// we know it should not actually wait when done
try { cont.resume(waitForUploadResult()) }
catch (e: Throwable) { cont.resumeWithException(e) }
}
}
cont.invokeOnCompletion { abort() }
}
此扩展使您能够编写与 TransferManager
一起使用的非常流畅的代码,并且您可以重写 uploadTile
函数以与 TransferManager
一起工作,而不是使用阻塞 AmazonS3
接口:
suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) {
val json: String = "{}"
val key = "$s3Prefix/x4/$z/$x/$y.json"
tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
.await()
}
请注意,uploadTile
的新版本如何使用上面定义的挂起函数 await
。
我需要将许多文件上传到 S3,按顺序完成该工作需要几个小时。这正是 Kotlin 的新协程所擅长的,所以我想先尝试一下,而不是再次摆弄一些基于线程的执行服务。
这是我的(简化)代码:
fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking {
val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build()
for ((x, ys) in superTiles) {
val jobs = mutableListOf<Deferred<Any>>()
for ((y, superTile) in ys) {
val job = async(CommonPool) {
uploadTile(s3, x, y, superTile)
}
jobs.add(job)
}
jobs.map { it.await() }
}
}
suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) {
val json: String = "{}"
val key = "$s3Prefix/x4/$z/$x/$y.json"
s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
}
问题:代码仍然很慢,日志显示请求仍然按顺序执行:一个作业在创建下一个作业之前完成。仅在极少数情况下(十分之 1),我同时看到作业 运行ning。
为什么代码没有 运行 更快/并发?我该怎么办?
Kotlin 协程 excel 当你使用 asynchronous API 时,而你使用的 AmazonS3.putObject
API 是一个老式阻塞,同步 API,因此您只能获得与您正在使用的 CommonPool
中的线程数一样多的并发上传。将 uploadTile
函数标记为已修改 suspend
没有任何价值,因为它没有在其主体中使用任何暂停函数。
提高上传任务吞吐量的第一步是开始使用异步 API。我建议查看那个钱包的 Amazon S3 TransferManager。看看是否首先解决了您的问题。
Kotlin 协程旨在帮助您将异步 API 组合成易于使用的逻辑工作流。例如,通过编写以下扩展函数,可以直接将 TransferManager
的异步 API 与协程一起使用:
suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont ->
addProgressListener {
if (isDone) {
// we know it should not actually wait when done
try { cont.resume(waitForUploadResult()) }
catch (e: Throwable) { cont.resumeWithException(e) }
}
}
cont.invokeOnCompletion { abort() }
}
此扩展使您能够编写与 TransferManager
一起使用的非常流畅的代码,并且您可以重写 uploadTile
函数以与 TransferManager
一起工作,而不是使用阻塞 AmazonS3
接口:
suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) {
val json: String = "{}"
val key = "$s3Prefix/x4/$z/$x/$y.json"
tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
.await()
}
请注意,uploadTile
的新版本如何使用上面定义的挂起函数 await
。