akka.http.scaladsl.model.ParsingException:使用 akka http 将大文件上传到 S3 时,多部分实体意外结束
akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity while uploading a large file to S3 using akka http
我正在尝试使用带有 Alpakka S3 连接器的 Akka HTTP 将一个大文件(目前为 90 MB)上传到 S3。它适用于小文件 (25 MB),但当我尝试上传大文件 (90 MB) 时,出现以下错误:
akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun.applyOrElse(MultipartUnmarshallers.scala:108)
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun.applyOrElse(MultipartUnmarshallers.scala:103)
at akka.stream.impl.fusing.Collect$$anon.$anonfun$wrappedPf(Ops.scala:227)
at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:186)
at akka.stream.impl.fusing.Collect$$anon.onPush(Ops.scala:229)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:510)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:739)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:765)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
尽管最后我收到了成功消息,但文件并未完全上传。它只能上传 45-50 MB。
我正在使用以下代码:
S3Utility.scala
class S3Utility(implicit as: ActorSystem, m: Materializer) {
private val bucketName = "test"
def sink(fileInfo: FileInfo): Sink[ByteString, Future[MultipartUploadResult]] = {
val fileName = fileInfo.fileName
S3.multipartUpload(bucketName, fileName)
}
}
路线:
def uploadLargeFile: Route =
post {
path("import" / "file") {
extractMaterializer { implicit materializer =>
withoutSizeLimit {
fileUpload("file") {
case (metadata, byteSource) =>
logger.info(s"Request received to import large file: ${metadata.fileName}")
val uploadFuture = byteSource.runWith(s3Utility.sink(metadata))
onComplete(uploadFuture) {
case Success(result) =>
logger.info(s"Successfully uploaded file")
complete(StatusCodes.OK)
case Failure(ex) =>
println(ex, "Error in uploading file")
complete(StatusCodes.FailedDependency, ex.getMessage)
}
}
}
}
}
}
如有任何帮助,我们将不胜感激。谢谢
策略 1
能否将文件分成更小的块并重试,示例代码如下:
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("some-kind-of-endpoint"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("user", "pass")))
.disableChunkedEncoding()
.withPathStyleAccessEnabled(true)
.build();
// Create a list of UploadPartResponse objects. You get one of these
// for each part upload.
List<PartETag> partETags = new ArrayList<PartETag>();
// Step 1: Initialize.
InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest("bucket", "key");
InitiateMultipartUploadResult initResponse =
s3Client.initiateMultipartUpload(initRequest);
File file = new File("filepath");
long contentLength = file.length();
long partSize = 5242880; // Set part size to 5 MB.
try {
// Step 2: Upload parts.
long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
// Last part can be less than 5 MB. Adjust part size.
partSize = Math.min(partSize, (contentLength - filePosition));
// Create a request to upload a part.
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName("bucket").withKey("key")
.withUploadId(initResponse.getUploadId()).withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
// Upload part and add response to our list.
partETags.add(
s3Client.uploadPart(uploadRequest).getPartETag());
filePosition += partSize;
}
// Step 3: Complete.
CompleteMultipartUploadRequest compRequest = new
CompleteMultipartUploadRequest(
"bucket",
"key",
initResponse.getUploadId(),
partETags);
s3Client.completeMultipartUpload(compRequest);
} catch (Exception e) {
s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(
"bucket", "key", initResponse.getUploadId()));
}
策略 2
增加Akka HTTP服务器的idle-timeout
(设置为infinite
即可),如下:
akka.http.server.idle-timeout=infinite
这会增加服务器预计空闲的时间。默认情况下,它的值为 60 秒。如果服务器无法在该时间段内上传文件,它将关闭连接并抛出“意外结束多部分实体”错误。
我正在尝试使用带有 Alpakka S3 连接器的 Akka HTTP 将一个大文件(目前为 90 MB)上传到 S3。它适用于小文件 (25 MB),但当我尝试上传大文件 (90 MB) 时,出现以下错误:
akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun.applyOrElse(MultipartUnmarshallers.scala:108)
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun.applyOrElse(MultipartUnmarshallers.scala:103)
at akka.stream.impl.fusing.Collect$$anon.$anonfun$wrappedPf(Ops.scala:227)
at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:186)
at akka.stream.impl.fusing.Collect$$anon.onPush(Ops.scala:229)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:510)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:739)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:765)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
尽管最后我收到了成功消息,但文件并未完全上传。它只能上传 45-50 MB。
我正在使用以下代码: S3Utility.scala
class S3Utility(implicit as: ActorSystem, m: Materializer) {
private val bucketName = "test"
def sink(fileInfo: FileInfo): Sink[ByteString, Future[MultipartUploadResult]] = {
val fileName = fileInfo.fileName
S3.multipartUpload(bucketName, fileName)
}
}
路线:
def uploadLargeFile: Route =
post {
path("import" / "file") {
extractMaterializer { implicit materializer =>
withoutSizeLimit {
fileUpload("file") {
case (metadata, byteSource) =>
logger.info(s"Request received to import large file: ${metadata.fileName}")
val uploadFuture = byteSource.runWith(s3Utility.sink(metadata))
onComplete(uploadFuture) {
case Success(result) =>
logger.info(s"Successfully uploaded file")
complete(StatusCodes.OK)
case Failure(ex) =>
println(ex, "Error in uploading file")
complete(StatusCodes.FailedDependency, ex.getMessage)
}
}
}
}
}
}
如有任何帮助,我们将不胜感激。谢谢
策略 1
能否将文件分成更小的块并重试,示例代码如下:
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("some-kind-of-endpoint"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("user", "pass")))
.disableChunkedEncoding()
.withPathStyleAccessEnabled(true)
.build();
// Create a list of UploadPartResponse objects. You get one of these
// for each part upload.
List<PartETag> partETags = new ArrayList<PartETag>();
// Step 1: Initialize.
InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest("bucket", "key");
InitiateMultipartUploadResult initResponse =
s3Client.initiateMultipartUpload(initRequest);
File file = new File("filepath");
long contentLength = file.length();
long partSize = 5242880; // Set part size to 5 MB.
try {
// Step 2: Upload parts.
long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
// Last part can be less than 5 MB. Adjust part size.
partSize = Math.min(partSize, (contentLength - filePosition));
// Create a request to upload a part.
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName("bucket").withKey("key")
.withUploadId(initResponse.getUploadId()).withPartNumber(i)
.withFileOffset(filePosition)
.withFile(file)
.withPartSize(partSize);
// Upload part and add response to our list.
partETags.add(
s3Client.uploadPart(uploadRequest).getPartETag());
filePosition += partSize;
}
// Step 3: Complete.
CompleteMultipartUploadRequest compRequest = new
CompleteMultipartUploadRequest(
"bucket",
"key",
initResponse.getUploadId(),
partETags);
s3Client.completeMultipartUpload(compRequest);
} catch (Exception e) {
s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(
"bucket", "key", initResponse.getUploadId()));
}
策略 2
增加Akka HTTP服务器的idle-timeout
(设置为infinite
即可),如下:
akka.http.server.idle-timeout=infinite
这会增加服务器预计空闲的时间。默认情况下,它的值为 60 秒。如果服务器无法在该时间段内上传文件,它将关闭连接并抛出“意外结束多部分实体”错误。