S3 / MinIO with Java / Scala:将字节缓冲区文件块保存到对象存储
S3 / MinIO with Java / Scala: Saving byte buffers chunks of files to object storage
所以,假设我有一个 Scala Vert.x Web REST API,它通过 HTTP 多部分请求接收文件上传。但是,它不会将传入的文件数据作为单个 InputStream
接收。相反,每个文件都是作为一系列字节缓冲区接收的,这些缓冲区通过一些回调函数移交。
回调基本上是这样的:
// the callback that receives byte buffers (chunks) of the file being uploaded
// it is called multiple times until the full file has been received
upload.handler { buffer =>
// send chunk to backend
}
// the callback that gets called after the full file has been uploaded
// (i.e. after all chunks have been received)
upload.endHandler { _ =>
// do something after the file has been uploaded
}
// callback called if an exception is raised while receiving the file
upload.exceptionHandler { e =>
// do something to handle the exception
}
现在,我想使用这些回调将文件保存到 MinIO Bucket 中(如果您不熟悉,MinIO 基本上是自托管的 S3,它 API 几乎相同作为 S3 Java API).
因为我没有文件句柄,所以我需要使用putObject()
将InputStream
放入MinIO。
我目前使用 MinIO Java API 的低效解决方法如下所示:
// this is all inside the context of handling a HTTP request
val out = new PipedOutputStream()
val in = new PipedInputStream()
var size = 0
in.connect(out)
upload.handler { buffer =>
s.write(buffer.getBytes)
size += buffer.length()
}
upload.endHandler { _ =>
minioClient.putObject(
PutObjectArgs.builder()
.bucket("my-bucket")
.object("my-filename")
.stream(in, size, 50000000)
.build())
}
显然,这不是最佳选择。由于我在这里使用的是一个简单的 java.io
流,因此整个文件最终会加载到内存中。
我不想在将文件放入对象存储之前将文件保存到服务器上的磁盘。我想将它直接放入我的对象存储中。
如何使用 S3 API 和通过 upload.handler
回调提供给我的一系列字节缓冲区来完成此操作?
编辑
我应该补充一点,我正在使用 MinIO,因为我不能使用商业托管的云解决方案,例如 S3。但是,正如 MinIO 网站上提到的,我 可以 使用 Amazon 的 S3 Java SDK,同时使用 MinIO 作为我的存储解决方案。
我尝试按照 this guide on Amazon's website 将对象分块上传到 S3。
我尝试的解决方案如下所示:
context.request.uploadHandler { upload =>
println(s"Filename: ${upload.filename()}")
val partETags = new util.ArrayList[PartETag]
val initRequest = new InitiateMultipartUploadRequest("docs", "my-filekey")
val initResponse = s3Client.initiateMultipartUpload(initRequest)
upload.handler { buffer =>
println("uploading part", buffer.length())
try {
val request = new UploadPartRequest()
.withBucketName("docs")
.withKey("my-filekey")
.withPartSize(buffer.length())
.withUploadId(initResponse.getUploadId)
.withInputStream(new ByteArrayInputStream(buffer.getBytes()))
val uploadResult = s3Client.uploadPart(request)
partETags.add(uploadResult.getPartETag)
} catch {
case e: Exception => println("Exception raised: ", e)
}
}
// this gets called for EACH uploaded file sequentially
upload.endHandler { _ =>
// upload successful
println("done uploading")
try {
val compRequest = new CompleteMultipartUploadRequest("docs", "my-filekey", initResponse.getUploadId, partETags)
s3Client.completeMultipartUpload(compRequest)
} catch {
case e: Exception => println("Exception raised: ", e)
}
context.response.setStatusCode(200).end("Uploaded")
}
upload.exceptionHandler { e =>
// handle the exception
println("exception thrown", e)
}
}
}
这适用于小文件(我的测试小文件是 11 字节),但不适用于大文件。
对于大文件,upload.handler
中的进程会随着文件的继续上传而逐渐变慢。此外,upload.endHandler
永远不会被调用,并且文件在 100% 的文件上传后以某种方式继续上传。
但是,一旦我注释掉 upload.handler
中的 s3Client.uploadPart(request)
部分和 upload.endHandler
中的 s3Client.completeMultipartUpload
部分(基本上是丢弃文件而不是保存文件到对象存储),文件上传正常进行并正确终止。
我发现我做错了什么(使用 S3 客户端时)。我没有在 upload.handler
中累积字节。我需要累积字节直到缓冲区大小足以上传一部分,而不是每次收到几个字节就上传。
由于 Amazon 的 S3 客户端和 MinIO 客户端都没有满足我的要求,我决定深入研究 putObject()
的实际实现方式并制作自己的客户端。这是我想出的。
此实现特定于 Vert.X,但它可以很容易地推广到通过 while
循环并使用一对 [=17] 与内置 java.io
InputStreams 一起工作=] 流。
此实现也特定于 MinIO,但它可以很容易地适应使用 S3 客户端,因为在大多数情况下,这两个 API 是相同的。
在这个例子中,Buffer
基本上是一个围绕 ByteArray
的容器,我在这里并没有做任何特别的事情。我用一个字节数组替换它以确保它仍然可以工作,而且确实如此。
package server
import com.google.common.collect.HashMultimap
import io.minio.MinioClient
import io.minio.messages.Part
import io.vertx.core.buffer.Buffer
import io.vertx.core.streams.ReadStream
import scala.collection.mutable.ListBuffer
class CustomMinioClient(client: MinioClient) extends MinioClient(client) {
def putReadStream(bucket: String = "my-bucket",
objectName: String,
region: String = "us-east-1",
data: ReadStream[Buffer],
objectSize: Long,
contentType: String = "application/octet-stream"
) = {
val headers: HashMultimap[String, String] = HashMultimap.create()
headers.put("Content-Type", contentType)
var uploadId: String = null
try {
val parts = new ListBuffer[Part]()
val createResponse = createMultipartUpload(bucket, region, objectName, headers, null)
uploadId = createResponse.result.uploadId()
var partNumber = 1
var uploadedSize = 0
// an array to use to accumulate bytes from the incoming stream until we have enough to make a `uploadPart` request
var partBuffer = Buffer.buffer()
// S3's minimum part size is 5mb, excepting the last part
// you should probably implement your own logic for determining how big
// to make each part based off the total object size to avoid unnecessary calls to S3 to upload small parts.
val minPartSize = 5 * 1024 * 1024
data.handler { buffer =>
partBuffer.appendBuffer(buffer)
val availableSize = objectSize - uploadedSize - partBuffer.length
val isMinPartSize = partBuffer.length >= minPartSize
val isLastPart = uploadedSize + partBuffer.length == objectSize
if (isMinPartSize || isLastPart) {
val partResponse = uploadPart(
bucket,
region,
objectName,
partBuffer.getBytes,
partBuffer.length,
uploadId,
partNumber,
null,
null
)
parts.addOne(new Part(partNumber, partResponse.etag))
uploadedSize += partBuffer.length
partNumber += 1
// empty the part buffer since we have already uploaded it
partBuffer = Buffer.buffer()
}
}
data.endHandler { _ =>
completeMultipartUpload(bucket, region, objectName, uploadId, parts.toArray, null, null)
}
data.exceptionHandler { exception =>
// should also probably abort the upload here
println("Handler caught exception in custom putObject: " + exception)
}
} catch {
// and abort it here as well...
case e: Exception =>
println("Exception thrown in custom `putObject`: " + e)
abortMultipartUpload(
bucket,
region,
objectName,
uploadId,
null,
null
)
}
}
}
这一切都可以很容易地使用。
首先,设置客户端:
private val _minioClient = MinioClient.builder()
.endpoint("http://localhost:9000")
.credentials("my-username", "my-password")
.build()
private val myClient = new CustomMinioClient(_minioClient)
然后,您收到上传请求的地方:
context.request.uploadHandler { upload =>
myClient.putReadStream(objectName = upload.filename(), data = upload, objectSize = myFileSize)
context.response().setStatusCode(200).end("done")
}
此实现的唯一问题是您需要提前知道请求的文件大小。
但是,这可以很容易地按照我的方式解决,特别是如果您使用的是网络 UI。
- 在尝试上传文件之前,向服务器发送包含文件名与文件大小映射的请求。
- 该预请求应为上传生成一个唯一 ID。
- 服务器可以使用上传ID作为索引来保存filename->filesize组。 - 服务器将上传 ID 发送回客户端。
- 客户端使用上传ID发送分段上传请求
- 服务器拉取文件列表及其大小并使用它来调用
.putReadStream()
所以,假设我有一个 Scala Vert.x Web REST API,它通过 HTTP 多部分请求接收文件上传。但是,它不会将传入的文件数据作为单个 InputStream
接收。相反,每个文件都是作为一系列字节缓冲区接收的,这些缓冲区通过一些回调函数移交。
回调基本上是这样的:
// the callback that receives byte buffers (chunks) of the file being uploaded
// it is called multiple times until the full file has been received
upload.handler { buffer =>
// send chunk to backend
}
// the callback that gets called after the full file has been uploaded
// (i.e. after all chunks have been received)
upload.endHandler { _ =>
// do something after the file has been uploaded
}
// callback called if an exception is raised while receiving the file
upload.exceptionHandler { e =>
// do something to handle the exception
}
现在,我想使用这些回调将文件保存到 MinIO Bucket 中(如果您不熟悉,MinIO 基本上是自托管的 S3,它 API 几乎相同作为 S3 Java API).
因为我没有文件句柄,所以我需要使用putObject()
将InputStream
放入MinIO。
我目前使用 MinIO Java API 的低效解决方法如下所示:
// this is all inside the context of handling a HTTP request
val out = new PipedOutputStream()
val in = new PipedInputStream()
var size = 0
in.connect(out)
upload.handler { buffer =>
s.write(buffer.getBytes)
size += buffer.length()
}
upload.endHandler { _ =>
minioClient.putObject(
PutObjectArgs.builder()
.bucket("my-bucket")
.object("my-filename")
.stream(in, size, 50000000)
.build())
}
显然,这不是最佳选择。由于我在这里使用的是一个简单的 java.io
流,因此整个文件最终会加载到内存中。
我不想在将文件放入对象存储之前将文件保存到服务器上的磁盘。我想将它直接放入我的对象存储中。
如何使用 S3 API 和通过 upload.handler
回调提供给我的一系列字节缓冲区来完成此操作?
编辑
我应该补充一点,我正在使用 MinIO,因为我不能使用商业托管的云解决方案,例如 S3。但是,正如 MinIO 网站上提到的,我 可以 使用 Amazon 的 S3 Java SDK,同时使用 MinIO 作为我的存储解决方案。
我尝试按照 this guide on Amazon's website 将对象分块上传到 S3。
我尝试的解决方案如下所示:
context.request.uploadHandler { upload =>
println(s"Filename: ${upload.filename()}")
val partETags = new util.ArrayList[PartETag]
val initRequest = new InitiateMultipartUploadRequest("docs", "my-filekey")
val initResponse = s3Client.initiateMultipartUpload(initRequest)
upload.handler { buffer =>
println("uploading part", buffer.length())
try {
val request = new UploadPartRequest()
.withBucketName("docs")
.withKey("my-filekey")
.withPartSize(buffer.length())
.withUploadId(initResponse.getUploadId)
.withInputStream(new ByteArrayInputStream(buffer.getBytes()))
val uploadResult = s3Client.uploadPart(request)
partETags.add(uploadResult.getPartETag)
} catch {
case e: Exception => println("Exception raised: ", e)
}
}
// this gets called for EACH uploaded file sequentially
upload.endHandler { _ =>
// upload successful
println("done uploading")
try {
val compRequest = new CompleteMultipartUploadRequest("docs", "my-filekey", initResponse.getUploadId, partETags)
s3Client.completeMultipartUpload(compRequest)
} catch {
case e: Exception => println("Exception raised: ", e)
}
context.response.setStatusCode(200).end("Uploaded")
}
upload.exceptionHandler { e =>
// handle the exception
println("exception thrown", e)
}
}
}
这适用于小文件(我的测试小文件是 11 字节),但不适用于大文件。
对于大文件,upload.handler
中的进程会随着文件的继续上传而逐渐变慢。此外,upload.endHandler
永远不会被调用,并且文件在 100% 的文件上传后以某种方式继续上传。
但是,一旦我注释掉 upload.handler
中的 s3Client.uploadPart(request)
部分和 upload.endHandler
中的 s3Client.completeMultipartUpload
部分(基本上是丢弃文件而不是保存文件到对象存储),文件上传正常进行并正确终止。
我发现我做错了什么(使用 S3 客户端时)。我没有在 upload.handler
中累积字节。我需要累积字节直到缓冲区大小足以上传一部分,而不是每次收到几个字节就上传。
由于 Amazon 的 S3 客户端和 MinIO 客户端都没有满足我的要求,我决定深入研究 putObject()
的实际实现方式并制作自己的客户端。这是我想出的。
此实现特定于 Vert.X,但它可以很容易地推广到通过 while
循环并使用一对 [=17] 与内置 java.io
InputStreams 一起工作=] 流。
此实现也特定于 MinIO,但它可以很容易地适应使用 S3 客户端,因为在大多数情况下,这两个 API 是相同的。
在这个例子中,Buffer
基本上是一个围绕 ByteArray
的容器,我在这里并没有做任何特别的事情。我用一个字节数组替换它以确保它仍然可以工作,而且确实如此。
package server
import com.google.common.collect.HashMultimap
import io.minio.MinioClient
import io.minio.messages.Part
import io.vertx.core.buffer.Buffer
import io.vertx.core.streams.ReadStream
import scala.collection.mutable.ListBuffer
class CustomMinioClient(client: MinioClient) extends MinioClient(client) {
def putReadStream(bucket: String = "my-bucket",
objectName: String,
region: String = "us-east-1",
data: ReadStream[Buffer],
objectSize: Long,
contentType: String = "application/octet-stream"
) = {
val headers: HashMultimap[String, String] = HashMultimap.create()
headers.put("Content-Type", contentType)
var uploadId: String = null
try {
val parts = new ListBuffer[Part]()
val createResponse = createMultipartUpload(bucket, region, objectName, headers, null)
uploadId = createResponse.result.uploadId()
var partNumber = 1
var uploadedSize = 0
// an array to use to accumulate bytes from the incoming stream until we have enough to make a `uploadPart` request
var partBuffer = Buffer.buffer()
// S3's minimum part size is 5mb, excepting the last part
// you should probably implement your own logic for determining how big
// to make each part based off the total object size to avoid unnecessary calls to S3 to upload small parts.
val minPartSize = 5 * 1024 * 1024
data.handler { buffer =>
partBuffer.appendBuffer(buffer)
val availableSize = objectSize - uploadedSize - partBuffer.length
val isMinPartSize = partBuffer.length >= minPartSize
val isLastPart = uploadedSize + partBuffer.length == objectSize
if (isMinPartSize || isLastPart) {
val partResponse = uploadPart(
bucket,
region,
objectName,
partBuffer.getBytes,
partBuffer.length,
uploadId,
partNumber,
null,
null
)
parts.addOne(new Part(partNumber, partResponse.etag))
uploadedSize += partBuffer.length
partNumber += 1
// empty the part buffer since we have already uploaded it
partBuffer = Buffer.buffer()
}
}
data.endHandler { _ =>
completeMultipartUpload(bucket, region, objectName, uploadId, parts.toArray, null, null)
}
data.exceptionHandler { exception =>
// should also probably abort the upload here
println("Handler caught exception in custom putObject: " + exception)
}
} catch {
// and abort it here as well...
case e: Exception =>
println("Exception thrown in custom `putObject`: " + e)
abortMultipartUpload(
bucket,
region,
objectName,
uploadId,
null,
null
)
}
}
}
这一切都可以很容易地使用。
首先,设置客户端:
private val _minioClient = MinioClient.builder()
.endpoint("http://localhost:9000")
.credentials("my-username", "my-password")
.build()
private val myClient = new CustomMinioClient(_minioClient)
然后,您收到上传请求的地方:
context.request.uploadHandler { upload =>
myClient.putReadStream(objectName = upload.filename(), data = upload, objectSize = myFileSize)
context.response().setStatusCode(200).end("done")
}
此实现的唯一问题是您需要提前知道请求的文件大小。
但是,这可以很容易地按照我的方式解决,特别是如果您使用的是网络 UI。
- 在尝试上传文件之前,向服务器发送包含文件名与文件大小映射的请求。
- 该预请求应为上传生成一个唯一 ID。
- 服务器可以使用上传ID作为索引来保存filename->filesize组。 - 服务器将上传 ID 发送回客户端。
- 客户端使用上传ID发送分段上传请求
- 服务器拉取文件列表及其大小并使用它来调用
.putReadStream()