如何使用 Netty 编写异步 HttpServer

How to write an aynchronous HttpServer using Netty

我正在尝试创建一个简单的异步 Netty HTTP 服务器。我需要支持的关键特性是能够将包装在事件中的 HttpRequest 发送到将处理事件并生成响应的事件驱动组件(在同一 JVM 中)。此事件驱动组件在 Netty 组件的单独线程中运行。

我遵循了一些示例来构建对我有用的东西,并且当响应与处理程序在同一线程中生成时,我所用的东西是有效的

我编写/改编的自定义处理程序是:

    private class WebServerHandler extends SimpleChannelInboundHandler<Object> {
        String uri;
        @Override
        public void messageReceived(final ChannelHandlerContext ctx, final Object msg) {
            if (!(msg instanceof FullHttpRequest)) return;

            final FullHttpRequest request = (FullHttpRequest) msg;

            uri = request.uri();
            System.out.println("Handling: " + uri);
            if (HttpUtil.is100ContinueExpected(request)) send100Continue(ctx);

            final Handler handler = WebServer.this.getHandler(request.uri());
            if (handler == null) {
                writeNotFound(ctx, request);
            } else {
                try {
                    handler.handle(ctx, request);
                } catch (final Throwable ex) {
                    ex.printStackTrace();
                    writeInternalServerError(ctx, request);
                }
            }
        }

        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            ctx.close();
        }

        @Override
        public void channelReadComplete(final ChannelHandlerContext ctx) {
            System.out.println("Completed: " + uri);
//          ctx.flush();
        }
    }

当我将事件发送到事件驱动组件(它又是 运行 在一个单独的线程中)时,问题就出现了。在这种情况下,上面的 channelReadComplete(...) 方法在事件驱动组件处理事件之前被调用。在我用来测试解决方案的 Web 浏览器中,连接永远不会终止。

我使用的管道是:

    final ChannelPipeline p = ch.pipeline();
    p.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
    p.addLast("aggregator", new HttpObjectAggregator(100 * 1024 * 1024));
    p.addLast("encoder", new HttpResponseEncoder());
    p.addLast("handler", new WebServerHandler());

事件驱动组件是一个庞大而复杂的系统(实际上是一种编程语言),我没有包含任何信息,但我知道相关代码已被成功调用。我尝试执行以生成响应的实际代码是:

File file = new File(file_uri);
System.out.println("file: " + file.getAbsolutePath());
RandomAccessFile raf = new RandomAccessFile(file, "r");
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpUtil.setContentLength(response, raf.length());

ctx.write(response); 
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, file.length())); 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

raf.close();

所以,我的问题是 - 如何在我发送响应之前停止 Netty 调用 channelReadComplete(...) 方法?

我认为您的 raf.close 调用应该在附加到文件写入的侦听器中进行。如所写,我希望您在写入调用有机会读取所有文件之前关闭文件。您的响应正确设置了内容长度,但如果您不写那么多字节,因为您提前关闭了文件,那么您的客户端将挂起等待其余部分。

您应该将编码器移到管道中的聚合器之前。聚合器可以发送编码器需要编码的响应。考虑使用 HttpServerCodec,因为它可以用一个处理程序替换您的 encoder/Decoder。

您使用 Object 作为类型扩展了 SimpleChannelInboundHandler,但随后拒绝了除 FullHttpRequest 消息之外的任何消息。如果您更改扩展中的类型,则 the parent class will take care of that check and casting for you (您使用的是 Netty 5 吗?为什么?)。

示例:

... extends SimpleChannelInboundHandler<FullHttpRequest> {
...
public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) {

我不认为 channelReadComplete 被提前调用对您的场景很重要。由于您已经在 read/response 处理程序中刷新,因此 channelReadComplete 中的刷新不是必需的。