Vert.x ReadStream<Buffer> 到 InputStream

Vert.x ReadStream<Buffer> to InputStream

我正在尝试使用 Jersey 设置 Vert.X 来处理 POST 数据(不一定是表单数据)。

球衣 ContainerRequest.setEntityStream 采用了 InputStream,这正是我想要打造的。但是,我似乎无法在不使用 bodyHandler 或我自己的自定义方法将整个数据读入内存的情况下传递该数据,该方法执行类似的操作但限制了输入

    final Buffer body = Buffer.buffer();
    event
        .handler(buffer -> {
            if (!event.response().headWritten()) {
                body.appendBuffer(buffer);
                if (body.length() > 10 * 1024 * 1024) {
                    event.response()
                        .setStatusCode(REQUEST_ENTITY_TOO_LARGE.getStatusCode())
                        .setStatusMessage(REQUEST_ENTITY_TOO_LARGE.getReasonPhrase())
                        .end();
                }
            }
        })
        .endHandler(aVoid -> {
            request.setEntityStream(new VertxBufferInputStream(body));
            appHandler.handle(request);
        });

VertxBufferInputStream 是 VertXbuffer 的简单包装器。只是为了通过避免转换为 ByteArrayInputStream() 来节省一些内存。但它有全身

我想避免全身而流。我已经尝试了一些非常棘手和糟糕的代码,但最终无法正常工作,因为它阻塞了事件循环,因为 handler 未被调用并正在等待它。

这么漂亮的问题要解决 :)
您可以在 https://github.com/jersey/jersey/blob/12e5d8bdf22bcd2676a1032ed69473cf2bbc48c7/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java#L124 获得灵感 实现 Jersey 与 Netty 集成的地方。

我相信已经解决了完全相同的问题(尽管对于不同的网络服务器,在本例中为 Netty),即:

  1. http 请求在事件循环(由 Netty)上处理,因此必须在不同的线程(非事件循环)上调用 ApplicationHandler.handle() 方法
  2. 非阻塞 API 必须转换为阻塞 InputStream。这是在 NettyInputStream 实现的。它利用了 Netty 的 ByteBuf 可以很容易地转换为 InputStream 的事实,因此通过使用 LinkedBlockingDeque 这些 InputStreams 被转换为一个。 (仅供参考,我不是 100% 确定提供程序(事件循环线程)是否保证永远不会阻塞,这将是一个错误。)

顺便说一句,这只是对传入数据的处理。对于响应,您必须实现 OutputStream(由 Jersey 使用)到 Vert.X 非阻塞 API.

的转换

需要两个组件。

  1. 您需要确保使用 vertx.executeBlocking 将可能阻止的任何处理分开,请参阅 https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L128

  2. 您需要处理两个事件:新的数据缓冲区何时进入以及何时结束。 https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L123

  3. 您需要实现一个 InputStream,该 InputStream 能够从另一个线程接受数据并在没有数据时阻塞,并且能够接收没有更多输入的消息。 https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/internal/VertxBlockingInputStream.java