如何通过管道将 Vertx 请求直接流式传输到文件

How to stream Vertx request directly to file via pipe

我正在使用 Vertx。 4.0.3 并尝试将请求正文直接流式传输到文件。为此,我使用以下 (Kotlin) 代码:

router.post("/upload").handler { ctx ->
    val startTime = System.currentTimeMillis()
    val response = ctx.response()
    val request = ctx.request()
    val fs = vertx.fileSystem()

    fs.open("data.bin", OpenOptions()) { res ->
        if (res.succeeded()) {
            val asyncFile = res.result()
            request.pipeTo(asyncFile).onComplete { writeResult ->
                if(writeResult.succeeded()) {
                    response.end("${System.currentTimeMillis() - startTime}")
                } else {
                    response.setStatusCode(500).end(res.cause().stackTraceToString())
                }
            }
        } else {
            response.setStatusCode(500).end(res.cause().stackTraceToString())
        }
    }
}

不幸的是,我遇到了这样的异常:

java.lang.IllegalStateException: Request has already been read
    at io.vertx.core.http.impl.Http1xServerRequest.checkEnded(Http1xServerRequest.java:628)
    at io.vertx.core.http.impl.Http1xServerRequest.endHandler(Http1xServerRequest.java:334)
    at io.vertx.core.http.impl.Http1xServerRequest.endHandler(Http1xServerRequest.java:60)
    at io.vertx.core.streams.impl.PipeImpl.<init>(PipeImpl.java:35)
    at io.vertx.core.streams.ReadStream.pipeTo(ReadStream.java:119)
    at io.vertx.ext.web.impl.HttpServerRequestWrapper.pipeTo(HttpServerRequestWrapper.java:410)
    at fileupload.AppKt$main.handle(App.kt:60)
    at fileupload.AppKt$main.handle(App.kt)
    at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:124)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess[=11=](FutureBase.java:54)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

因为我对请求没有做任何事情,所以我不知道我的请求在哪里已经被读取了。有人可以给我一些见解吗?谢谢!

发生这种情况是因为在 fs.open 的回调被调用时,请求已经被完全读取。

您必须在打开文件之前暂停请求并在以下时间后恢复:

router.post("/upload").handler { ctx ->
    val startTime = System.currentTimeMillis()
    val response = ctx.response()
    val request = ctx.request()
    val fs = vertx.fileSystem()

    // Pause
    request.pause()

    fs.open("data.bin", OpenOptions()) { res ->

        // Resume
        request.resume()

        if (res.succeeded()) {
            val asyncFile = res.result()
            request.pipeTo(asyncFile).onComplete { writeResult ->
                if(writeResult.succeeded()) {
                    response.end("${System.currentTimeMillis() - startTime}")
                } else {
                    response.setStatusCode(500).end(res.cause().stackTraceToString())
                }
            }
        } else {
            response.setStatusCode(500).end(res.cause().stackTraceToString())
        }
    }
}

Vert.x 为 Kotlin 提供了一组等效的挂起函数。在您的情况下,您可能希望实现等效的 openAwaitpipeToAwait 函数以避免“回调地狱”。现在您的代码可能如下所示:

router.post("/upload").handler { ctx ->
    val startTime = System.currentTimeMillis()
    val response = ctx.response()
    val request = ctx.request()
    val fs = vertx.fileSystem()

    val asyncFile = fs.openAwait("data.bin", OpenOptions())
    val result = request.pipeToAwait(asyncFile)

    // code for sending http response
}