如何通过管道将 Vertx 请求直接流式传输到文件
How to stream Vertx request directly to file via pipe
我正在使用 Vertx。 4.0.3 并尝试将请求正文直接流式传输到文件。为此,我使用以下 (Kotlin) 代码:
router.post("/upload").handler { ctx ->
val startTime = System.currentTimeMillis()
val response = ctx.response()
val request = ctx.request()
val fs = vertx.fileSystem()
fs.open("data.bin", OpenOptions()) { res ->
if (res.succeeded()) {
val asyncFile = res.result()
request.pipeTo(asyncFile).onComplete { writeResult ->
if(writeResult.succeeded()) {
response.end("${System.currentTimeMillis() - startTime}")
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
}
不幸的是,我遇到了这样的异常:
java.lang.IllegalStateException: Request has already been read
at io.vertx.core.http.impl.Http1xServerRequest.checkEnded(Http1xServerRequest.java:628)
at io.vertx.core.http.impl.Http1xServerRequest.endHandler(Http1xServerRequest.java:334)
at io.vertx.core.http.impl.Http1xServerRequest.endHandler(Http1xServerRequest.java:60)
at io.vertx.core.streams.impl.PipeImpl.<init>(PipeImpl.java:35)
at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119)
at io.vertx.ext.web.impl.HttpServerRequestWrapper.pipeTo(HttpServerRequestWrapper.java:410)
at fileupload.AppKt$main.handle(App.kt:60)
at fileupload.AppKt$main.handle(App.kt)
at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:124)
at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess[=11=](FutureBase.java:54)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
因为我对请求没有做任何事情,所以我不知道我的请求在哪里已经被读取了。有人可以给我一些见解吗?谢谢!
发生这种情况是因为在 fs.open
的回调被调用时,请求已经被完全读取。
您必须在打开文件之前暂停请求并在以下时间后恢复:
router.post("/upload").handler { ctx ->
val startTime = System.currentTimeMillis()
val response = ctx.response()
val request = ctx.request()
val fs = vertx.fileSystem()
// Pause
request.pause()
fs.open("data.bin", OpenOptions()) { res ->
// Resume
request.resume()
if (res.succeeded()) {
val asyncFile = res.result()
request.pipeTo(asyncFile).onComplete { writeResult ->
if(writeResult.succeeded()) {
response.end("${System.currentTimeMillis() - startTime}")
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
}
Vert.x 为 Kotlin 提供了一组等效的挂起函数。在您的情况下,您可能希望实现等效的 openAwait
和 pipeToAwait
函数以避免“回调地狱”。现在您的代码可能如下所示:
router.post("/upload").handler { ctx ->
val startTime = System.currentTimeMillis()
val response = ctx.response()
val request = ctx.request()
val fs = vertx.fileSystem()
val asyncFile = fs.openAwait("data.bin", OpenOptions())
val result = request.pipeToAwait(asyncFile)
// code for sending http response
}
我正在使用 Vertx。 4.0.3 并尝试将请求正文直接流式传输到文件。为此,我使用以下 (Kotlin) 代码:
router.post("/upload").handler { ctx ->
val startTime = System.currentTimeMillis()
val response = ctx.response()
val request = ctx.request()
val fs = vertx.fileSystem()
fs.open("data.bin", OpenOptions()) { res ->
if (res.succeeded()) {
val asyncFile = res.result()
request.pipeTo(asyncFile).onComplete { writeResult ->
if(writeResult.succeeded()) {
response.end("${System.currentTimeMillis() - startTime}")
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
}
不幸的是,我遇到了这样的异常:
java.lang.IllegalStateException: Request has already been read
at io.vertx.core.http.impl.Http1xServerRequest.checkEnded(Http1xServerRequest.java:628)
at io.vertx.core.http.impl.Http1xServerRequest.endHandler(Http1xServerRequest.java:334)
at io.vertx.core.http.impl.Http1xServerRequest.endHandler(Http1xServerRequest.java:60)
at io.vertx.core.streams.impl.PipeImpl.<init>(PipeImpl.java:35)
at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119)
at io.vertx.ext.web.impl.HttpServerRequestWrapper.pipeTo(HttpServerRequestWrapper.java:410)
at fileupload.AppKt$main.handle(App.kt:60)
at fileupload.AppKt$main.handle(App.kt)
at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:124)
at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess[=11=](FutureBase.java:54)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
因为我对请求没有做任何事情,所以我不知道我的请求在哪里已经被读取了。有人可以给我一些见解吗?谢谢!
发生这种情况是因为在 fs.open
的回调被调用时,请求已经被完全读取。
您必须在打开文件之前暂停请求并在以下时间后恢复:
router.post("/upload").handler { ctx ->
val startTime = System.currentTimeMillis()
val response = ctx.response()
val request = ctx.request()
val fs = vertx.fileSystem()
// Pause
request.pause()
fs.open("data.bin", OpenOptions()) { res ->
// Resume
request.resume()
if (res.succeeded()) {
val asyncFile = res.result()
request.pipeTo(asyncFile).onComplete { writeResult ->
if(writeResult.succeeded()) {
response.end("${System.currentTimeMillis() - startTime}")
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
} else {
response.setStatusCode(500).end(res.cause().stackTraceToString())
}
}
}
Vert.x 为 Kotlin 提供了一组等效的挂起函数。在您的情况下,您可能希望实现等效的 openAwait
和 pipeToAwait
函数以避免“回调地狱”。现在您的代码可能如下所示:
router.post("/upload").handler { ctx ->
val startTime = System.currentTimeMillis()
val response = ctx.response()
val request = ctx.request()
val fs = vertx.fileSystem()
val asyncFile = fs.openAwait("data.bin", OpenOptions())
val result = request.pipeToAwait(asyncFile)
// code for sending http response
}