如何使用 Netty 编写异步 HttpServer
How to write an aynchronous HttpServer using Netty
我正在尝试创建一个简单的异步 Netty HTTP 服务器。我需要支持的关键特性是能够将包装在事件中的 HttpRequest 发送到将处理事件并生成响应的事件驱动组件(在同一 JVM 中)。此事件驱动组件在 Netty 组件的单独线程中运行。
我遵循了一些示例来构建对我有用的东西,并且当响应与处理程序在同一线程中生成时,我所用的东西是有效的
我编写/改编的自定义处理程序是:
private class WebServerHandler extends SimpleChannelInboundHandler<Object> {
String uri;
@Override
public void messageReceived(final ChannelHandlerContext ctx, final Object msg) {
if (!(msg instanceof FullHttpRequest)) return;
final FullHttpRequest request = (FullHttpRequest) msg;
uri = request.uri();
System.out.println("Handling: " + uri);
if (HttpUtil.is100ContinueExpected(request)) send100Continue(ctx);
final Handler handler = WebServer.this.getHandler(request.uri());
if (handler == null) {
writeNotFound(ctx, request);
} else {
try {
handler.handle(ctx, request);
} catch (final Throwable ex) {
ex.printStackTrace();
writeInternalServerError(ctx, request);
}
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
ctx.close();
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) {
System.out.println("Completed: " + uri);
// ctx.flush();
}
}
当我将事件发送到事件驱动组件(它又是 运行 在一个单独的线程中)时,问题就出现了。在这种情况下,上面的 channelReadComplete(...)
方法在事件驱动组件处理事件之前被调用。在我用来测试解决方案的 Web 浏览器中,连接永远不会终止。
我使用的管道是:
final ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
p.addLast("aggregator", new HttpObjectAggregator(100 * 1024 * 1024));
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("handler", new WebServerHandler());
事件驱动组件是一个庞大而复杂的系统(实际上是一种编程语言),我没有包含任何信息,但我知道相关代码已被成功调用。我尝试执行以生成响应的实际代码是:
File file = new File(file_uri);
System.out.println("file: " + file.getAbsolutePath());
RandomAccessFile raf = new RandomAccessFile(file, "r");
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpUtil.setContentLength(response, raf.length());
ctx.write(response);
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, file.length()));
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
raf.close();
所以,我的问题是 - 如何在我发送响应之前停止 Netty 调用 channelReadComplete(...)
方法?
我认为您的 raf.close
调用应该在附加到文件写入的侦听器中进行。如所写,我希望您在写入调用有机会读取所有文件之前关闭文件。您的响应正确设置了内容长度,但如果您不写那么多字节,因为您提前关闭了文件,那么您的客户端将挂起等待其余部分。
您应该将编码器移到管道中的聚合器之前。聚合器可以发送编码器需要编码的响应。考虑使用 HttpServerCodec,因为它可以用一个处理程序替换您的 encoder/Decoder。
您使用 Object 作为类型扩展了 SimpleChannelInboundHandler,但随后拒绝了除 FullHttpRequest 消息之外的任何消息。如果您更改扩展中的类型,则 the parent class will take care of that check and casting for you (您使用的是 Netty 5 吗?为什么?)。
示例:
... extends SimpleChannelInboundHandler<FullHttpRequest> {
...
public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) {
我不认为 channelReadComplete
被提前调用对您的场景很重要。由于您已经在 read/response 处理程序中刷新,因此 channelReadComplete
中的刷新不是必需的。
我正在尝试创建一个简单的异步 Netty HTTP 服务器。我需要支持的关键特性是能够将包装在事件中的 HttpRequest 发送到将处理事件并生成响应的事件驱动组件(在同一 JVM 中)。此事件驱动组件在 Netty 组件的单独线程中运行。
我遵循了一些示例来构建对我有用的东西,并且当响应与处理程序在同一线程中生成时,我所用的东西是有效的
我编写/改编的自定义处理程序是:
private class WebServerHandler extends SimpleChannelInboundHandler<Object> {
String uri;
@Override
public void messageReceived(final ChannelHandlerContext ctx, final Object msg) {
if (!(msg instanceof FullHttpRequest)) return;
final FullHttpRequest request = (FullHttpRequest) msg;
uri = request.uri();
System.out.println("Handling: " + uri);
if (HttpUtil.is100ContinueExpected(request)) send100Continue(ctx);
final Handler handler = WebServer.this.getHandler(request.uri());
if (handler == null) {
writeNotFound(ctx, request);
} else {
try {
handler.handle(ctx, request);
} catch (final Throwable ex) {
ex.printStackTrace();
writeInternalServerError(ctx, request);
}
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
ctx.close();
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) {
System.out.println("Completed: " + uri);
// ctx.flush();
}
}
当我将事件发送到事件驱动组件(它又是 运行 在一个单独的线程中)时,问题就出现了。在这种情况下,上面的 channelReadComplete(...)
方法在事件驱动组件处理事件之前被调用。在我用来测试解决方案的 Web 浏览器中,连接永远不会终止。
我使用的管道是:
final ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
p.addLast("aggregator", new HttpObjectAggregator(100 * 1024 * 1024));
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("handler", new WebServerHandler());
事件驱动组件是一个庞大而复杂的系统(实际上是一种编程语言),我没有包含任何信息,但我知道相关代码已被成功调用。我尝试执行以生成响应的实际代码是:
File file = new File(file_uri);
System.out.println("file: " + file.getAbsolutePath());
RandomAccessFile raf = new RandomAccessFile(file, "r");
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpUtil.setContentLength(response, raf.length());
ctx.write(response);
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, file.length()));
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
raf.close();
所以,我的问题是 - 如何在我发送响应之前停止 Netty 调用 channelReadComplete(...)
方法?
我认为您的 raf.close
调用应该在附加到文件写入的侦听器中进行。如所写,我希望您在写入调用有机会读取所有文件之前关闭文件。您的响应正确设置了内容长度,但如果您不写那么多字节,因为您提前关闭了文件,那么您的客户端将挂起等待其余部分。
您应该将编码器移到管道中的聚合器之前。聚合器可以发送编码器需要编码的响应。考虑使用 HttpServerCodec,因为它可以用一个处理程序替换您的 encoder/Decoder。
您使用 Object 作为类型扩展了 SimpleChannelInboundHandler,但随后拒绝了除 FullHttpRequest 消息之外的任何消息。如果您更改扩展中的类型,则 the parent class will take care of that check and casting for you (您使用的是 Netty 5 吗?为什么?)。
示例:
... extends SimpleChannelInboundHandler<FullHttpRequest> {
...
public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) {
我不认为 channelReadComplete
被提前调用对您的场景很重要。由于您已经在 read/response 处理程序中刷新,因此 channelReadComplete
中的刷新不是必需的。