使用 AWS SDK for Java,版本 2 将 InputStream 异步(非阻塞)上传到 AWS s3

Upload a InputStream to AWS s3 asynchronously (non-blocking) using AWS SDK for Java, version 2

当我将 inputStream 对象同步(阻塞方式)上传到 s3 时,它起作用了。

S3Client s3Client = S3Client.builder().build();
s3Client.putObject(objectRequest, RequestBody.fromInputStream(inputStream,STREAM_SIZE));

但是当我尝试使用 S3AsyncClient 时,AsyncRequestBody 上没有 .fromInputStream 方法。

S3AsyncClient s3AsyncClient = S3AsyncClient.builder().build();
s3AsyncClient.putObject(objectRequest, AsyncRequestBody.fromInputStream(inputStream,STREAM_SIZE)); // error no method named 'fromInputStream'

而且我不能使用 .fromByteBuffer,因为它会将整个流加载到内存中,这是我不想要的。

我很感兴趣为什么 AsyncRequestBody 中没有从 InputStream 中读取的方法。还有其他选择吗?

对于使用 Kotlin 和协程的任何人:这里是一个 kotlin 包装器,它将从 InputStream 创建一个异步 AsyncRequestBody。默认情况下,包装器将 运行 在后台线程中,但您可以在协同程序中传递显式 CoroutineScope 和 运行 ,这将避免创建单独的线程。

import io.ktor.util.cio.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import software.amazon.awssdk.core.async.AsyncRequestBody
import java.io.InputStream
import java.nio.ByteBuffer
import java.util.*

@OptIn(DelicateCoroutinesApi::class)
class StreamAsyncRequestBody(
  inputStream: InputStream,
  private val coroutineScope: CoroutineScope = GlobalScope
) :
  AsyncRequestBody {
  private val inputChannel =
    inputStream.toByteReadChannel(context = coroutineScope.coroutineContext)

  override fun subscribe(subscriber: Subscriber<in ByteBuffer>) {
    subscriber.onSubscribe(object : Subscription {
      private var done: Boolean = false

      override fun request(n: Long) {
        if (!done) {
          if (inputChannel.isClosedForRead) {
            complete()
          } else {
            coroutineScope.launch {
              inputChannel.read {
                subscriber.onNext(it)
                if (inputChannel.isClosedForRead) {
                  complete()
                }
              }
            }
          }
        }
      }

      private fun complete() {
        subscriber.onComplete()
        synchronized(this) {
          done = true
        }
      }

      override fun cancel() {
        synchronized(this) {
          done = true
        }
      }
    })
  }

  override fun contentLength(): Optional<Long> = Optional.empty()
}

用法示例:

suspend fun s3Put(objectRequest: PutObjectRequest, inputStream: InputStream) = coroutineContext {
  s3Client.putObject(objectRequest, StreamAsyncRequestBody(inputStream, this)
}

如果您使用 Java,您将需要创建自己的包装器并使用不同的协程库。或者,您可以创建一个具有固定线程数的 Executor:如果您一次上传太多 运行ning,它们会互相阻塞,但它们不会创建太多线程并阻塞整个程序。


编辑: 修复了代码。我没有测试以前的版本,我测试了几次这个版本来上传并且它有效。当然,这并不意味着它没有错误 :)

经过一番研究后,我发现了以下内容:

  1. InputStream 本质上是阻塞的,因此当您从输入流中读取时,某些线程将被阻塞,如果 answer 'toByteReadChannel' 将 return 读取阻塞通道。所以考虑到性能,它有点等同于在后台线程中执行同步 S3Client.fromInputStream(),您可以通过将其包装在 CompletableFuture 中来实现。
  2. 其他“AsyncRequestBody”类型,如“FileAsyncRequestBody”,使用 'nio'(非阻塞 I/O)和回调。也许这就是为什么 AWS 团队没有在“AsyncRequestBody”中包含“fromInputStream”的原因,因为根本不可能使用完全非阻塞的方式,而且会造成混淆。
  3. 如果你想要一个高度可扩展的解决方案,最好的解决方案是不要一起使用 InputStream,找到 InputStream 的来源并使用一些支持非阻塞通道的替代方案,在我的例子中我使用了 Java Flow 并将其转换为 'Publisher' 并使用 AsyncRequestBody.fromPublisher()