Vert.x ReadStream<Buffer> 到 InputStream
Vert.x ReadStream<Buffer> to InputStream
我正在尝试使用 Jersey 设置 Vert.X 来处理 POST 数据(不一定是表单数据)。
球衣 ContainerRequest.setEntityStream
采用了 InputStream
,这正是我想要打造的。但是,我似乎无法在不使用 bodyHandler
或我自己的自定义方法将整个数据读入内存的情况下传递该数据,该方法执行类似的操作但限制了输入
final Buffer body = Buffer.buffer();
event
.handler(buffer -> {
if (!event.response().headWritten()) {
body.appendBuffer(buffer);
if (body.length() > 10 * 1024 * 1024) {
event.response()
.setStatusCode(REQUEST_ENTITY_TOO_LARGE.getStatusCode())
.setStatusMessage(REQUEST_ENTITY_TOO_LARGE.getReasonPhrase())
.end();
}
}
})
.endHandler(aVoid -> {
request.setEntityStream(new VertxBufferInputStream(body));
appHandler.handle(request);
});
VertxBufferInputStream
是 VertXbuffer 的简单包装器。只是为了通过避免转换为 ByteArrayInputStream() 来节省一些内存。但它有全身
我想避免全身而流。我已经尝试了一些非常棘手和糟糕的代码,但最终无法正常工作,因为它阻塞了事件循环,因为 handler
未被调用并正在等待它。
这么漂亮的问题要解决 :)
您可以在 https://github.com/jersey/jersey/blob/12e5d8bdf22bcd2676a1032ed69473cf2bbc48c7/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java#L124 获得灵感
实现 Jersey 与 Netty 集成的地方。
我相信已经解决了完全相同的问题(尽管对于不同的网络服务器,在本例中为 Netty),即:
- http 请求在事件循环(由 Netty)上处理,因此必须在不同的线程(非事件循环)上调用
ApplicationHandler.handle()
方法
- 非阻塞 API 必须转换为阻塞
InputStream
。这是在 NettyInputStream
实现的。它利用了 Netty 的 ByteBuf
可以很容易地转换为 InputStream
的事实,因此通过使用 LinkedBlockingDeque
这些 InputStreams 被转换为一个。 (仅供参考,我不是 100% 确定提供程序(事件循环线程)是否保证永远不会阻塞,这将是一个错误。)
顺便说一句,这只是对传入数据的处理。对于响应,您必须实现 OutputStream
(由 Jersey 使用)到 Vert.X 非阻塞 API.
的转换
需要两个组件。
您需要确保使用 vertx.executeBlocking
将可能阻止的任何处理分开,请参阅 https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L128
您需要处理两个事件:新的数据缓冲区何时进入以及何时结束。 https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L123
您需要实现一个 InputStream,该 InputStream 能够从另一个线程接受数据并在没有数据时阻塞,并且能够接收没有更多输入的消息。
https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/internal/VertxBlockingInputStream.java
我正在尝试使用 Jersey 设置 Vert.X 来处理 POST 数据(不一定是表单数据)。
球衣 ContainerRequest.setEntityStream
采用了 InputStream
,这正是我想要打造的。但是,我似乎无法在不使用 bodyHandler
或我自己的自定义方法将整个数据读入内存的情况下传递该数据,该方法执行类似的操作但限制了输入
final Buffer body = Buffer.buffer();
event
.handler(buffer -> {
if (!event.response().headWritten()) {
body.appendBuffer(buffer);
if (body.length() > 10 * 1024 * 1024) {
event.response()
.setStatusCode(REQUEST_ENTITY_TOO_LARGE.getStatusCode())
.setStatusMessage(REQUEST_ENTITY_TOO_LARGE.getReasonPhrase())
.end();
}
}
})
.endHandler(aVoid -> {
request.setEntityStream(new VertxBufferInputStream(body));
appHandler.handle(request);
});
VertxBufferInputStream
是 VertXbuffer 的简单包装器。只是为了通过避免转换为 ByteArrayInputStream() 来节省一些内存。但它有全身
我想避免全身而流。我已经尝试了一些非常棘手和糟糕的代码,但最终无法正常工作,因为它阻塞了事件循环,因为 handler
未被调用并正在等待它。
这么漂亮的问题要解决 :)
您可以在 https://github.com/jersey/jersey/blob/12e5d8bdf22bcd2676a1032ed69473cf2bbc48c7/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java#L124 获得灵感
实现 Jersey 与 Netty 集成的地方。
我相信已经解决了完全相同的问题(尽管对于不同的网络服务器,在本例中为 Netty),即:
- http 请求在事件循环(由 Netty)上处理,因此必须在不同的线程(非事件循环)上调用
ApplicationHandler.handle()
方法 - 非阻塞 API 必须转换为阻塞
InputStream
。这是在NettyInputStream
实现的。它利用了 Netty 的ByteBuf
可以很容易地转换为InputStream
的事实,因此通过使用LinkedBlockingDeque
这些 InputStreams 被转换为一个。 (仅供参考,我不是 100% 确定提供程序(事件循环线程)是否保证永远不会阻塞,这将是一个错误。)
顺便说一句,这只是对传入数据的处理。对于响应,您必须实现 OutputStream
(由 Jersey 使用)到 Vert.X 非阻塞 API.
需要两个组件。
您需要确保使用
vertx.executeBlocking
将可能阻止的任何处理分开,请参阅 https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L128您需要处理两个事件:新的数据缓冲区何时进入以及何时结束。 https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L123
您需要实现一个 InputStream,该 InputStream 能够从另一个线程接受数据并在没有数据时阻塞,并且能够接收没有更多输入的消息。 https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/internal/VertxBlockingInputStream.java