netty 非阻塞反向代理

Non-blocking reverse proxy with netty

我正在尝试使用 netty 4.1 编写非阻塞代理。我有一个 "FrontHandler" 来处理传入连接,然后是一个 "BackHandler" 来处理传出连接。我正在关注 HexDumpProxyHandler (https://github.com/netty/netty/blob/ed4a89082bb29b9e7d869c5d25d6b9ea8fc9d25b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java#L67)

在这段代码中我发现:

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {, I've seen:

意味着仅当出站客户端连接已经准备好时才写入传入消息。这在 HTTP 代理情况下显然不理想,所以我在想什么是处理它的最佳方式。

我想知道在前端连接上禁用自动读取(并且只有在传出客户端连接准备就绪后才手动触发读取)是否是一个不错的选择。然后我可以在后端处理程序的 "channelActive" 事件中再次通过子套接字启用自动读取。但是,我不确定每次 "read()" 调用的处理程序中会收到多少消息(使用 HttpDecoder,我假设我会得到初始的 HttpRequest,但我真的很想避免得到后续的 HttpContent / LastHttpContent 消息,直到我再次手动触发 read() 并通过通道启用自动读取。

另一种选择是使用 Promise 从客户端 ChannelPool 获取 Channel:

private void setCurrentBackend(HttpRequest request) {
    pool.acquire(request, backendPromise);

    backendPromise.addListener((FutureListener<Channel>) future -> {
        Channel c = future.get();
        if (!currentBackend.compareAndSet(null, c)) {
            pool.release(c);
            throw new IllegalStateException();
        }
    });
}

然后通过该承诺执行从输入到输出的复制。例如:

private void handleLastContent(ChannelHandlerContext frontCtx, LastHttpContent lastContent) {
    doInBackend(c -> {
        c.writeAndFlush(lastContent).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                future.channel().read();
            } else {
                pool.release(c);
                frontCtx.close();
            }
        });
    });
}
private void doInBackend(Consumer<Channel> action) {
    Channel c = currentBackend.get();
    if (c == null) {
        backendPromise.addListener((FutureListener<Channel>) future -> action.accept(future.get()));
    } else {
        action.accept(c);
    }
}

但我不确定永远保持承诺并通过向其添加侦听器来完成从 "front" 到 "back" 的所有写入有多好。我也不确定如何实例化承诺,以便在正确的线程中执行操作......现在我正在使用:

backendPromise = group.next().<Channel> newPromise(); // bad
// or
backendPromise = frontCtx.channel().eventLoop().newPromise(); // OK?

(其中组与前端 ServerBootstrap 中使用的 eventLoopGroup 相同)。

如果它们没有通过正确的线程处理,我认为在 "doInBackend" 方法中进行 "else { }" 优化以避免使用 Promise 并直接写入通道可能会有问题.

我曾经开发过类似的基于 MQTT 协议的代理应用程序。所以它基本上被用来创建一个实时聊天应用程序。然而,我必须设计的应用程序本质上是异步的,所以我自然不会遇到任何此类问题。因为万一

outboundChannel.isActive() == false

然后我可以简单地将消息保存在队列或持久数据库中,然后在 outboundChannel 启动后处理它们。但是,由于您谈论的是 HTTP 应用程序,所以这意味着该应用程序本质上是同步的,这意味着客户端无法继续发送数据包,直到 outboundChannel 启动并且 运行。因此,您建议的选项是只有在通道处于活动状态时才会读取数据包,您可以通过在 ChannelConfig 中禁用自动读取来手动处理消息读取。

但是,我想建议的是,您应该检查 outboundChannel 是否处于活动状态。如果通道处于活动状态,则将数据包转发以进行处理。如果通道未激活,您应该通过发回类似于 Error404

的响应来拒绝该数据包

除此之外,您还应该将客户端配置为在一定时间间隔后继续重试发送数据包,并相应地处理需要做的事情,以防通道花费太长时间才能激活并变得可读。手动处理 channelRead 通常不是首选,并且是一种反模式。你应该让 Netty 以最有效的方式为你处理。

非自动读取方法本身不起作用,因为即使只执行了一次 read(),HttpRequestDecoder 也会创建多条消息。

我已经通过使用链式 CompletableFutures 解决了它。