netty 非阻塞反向代理
Non-blocking reverse proxy with netty
我正在尝试使用 netty 4.1 编写非阻塞代理。我有一个 "FrontHandler" 来处理传入连接,然后是一个 "BackHandler" 来处理传出连接。我正在关注 HexDumpProxyHandler (https://github.com/netty/netty/blob/ed4a89082bb29b9e7d869c5d25d6b9ea8fc9d25b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java#L67)
在这段代码中我发现:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {, I've seen:
意味着仅当出站客户端连接已经准备好时才写入传入消息。这在 HTTP 代理情况下显然不理想,所以我在想什么是处理它的最佳方式。
我想知道在前端连接上禁用自动读取(并且只有在传出客户端连接准备就绪后才手动触发读取)是否是一个不错的选择。然后我可以在后端处理程序的 "channelActive" 事件中再次通过子套接字启用自动读取。但是,我不确定每次 "read()" 调用的处理程序中会收到多少消息(使用 HttpDecoder,我假设我会得到初始的 HttpRequest,但我真的很想避免得到后续的 HttpContent / LastHttpContent 消息,直到我再次手动触发 read() 并通过通道启用自动读取。
另一种选择是使用 Promise 从客户端 ChannelPool 获取 Channel:
private void setCurrentBackend(HttpRequest request) {
pool.acquire(request, backendPromise);
backendPromise.addListener((FutureListener<Channel>) future -> {
Channel c = future.get();
if (!currentBackend.compareAndSet(null, c)) {
pool.release(c);
throw new IllegalStateException();
}
});
}
然后通过该承诺执行从输入到输出的复制。例如:
private void handleLastContent(ChannelHandlerContext frontCtx, LastHttpContent lastContent) {
doInBackend(c -> {
c.writeAndFlush(lastContent).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
future.channel().read();
} else {
pool.release(c);
frontCtx.close();
}
});
});
}
private void doInBackend(Consumer<Channel> action) {
Channel c = currentBackend.get();
if (c == null) {
backendPromise.addListener((FutureListener<Channel>) future -> action.accept(future.get()));
} else {
action.accept(c);
}
}
但我不确定永远保持承诺并通过向其添加侦听器来完成从 "front" 到 "back" 的所有写入有多好。我也不确定如何实例化承诺,以便在正确的线程中执行操作......现在我正在使用:
backendPromise = group.next().<Channel> newPromise(); // bad
// or
backendPromise = frontCtx.channel().eventLoop().newPromise(); // OK?
(其中组与前端 ServerBootstrap 中使用的 eventLoopGroup 相同)。
如果它们没有通过正确的线程处理,我认为在 "doInBackend" 方法中进行 "else { }" 优化以避免使用 Promise 并直接写入通道可能会有问题.
我曾经开发过类似的基于 MQTT 协议的代理应用程序。所以它基本上被用来创建一个实时聊天应用程序。然而,我必须设计的应用程序本质上是异步的,所以我自然不会遇到任何此类问题。因为万一
outboundChannel.isActive() == false
然后我可以简单地将消息保存在队列或持久数据库中,然后在 outboundChannel 启动后处理它们。但是,由于您谈论的是 HTTP 应用程序,所以这意味着该应用程序本质上是同步的,这意味着客户端无法继续发送数据包,直到 outboundChannel 启动并且 运行。因此,您建议的选项是只有在通道处于活动状态时才会读取数据包,您可以通过在 ChannelConfig
中禁用自动读取来手动处理消息读取。
但是,我想建议的是,您应该检查 outboundChannel 是否处于活动状态。如果通道处于活动状态,则将数据包转发以进行处理。如果通道未激活,您应该通过发回类似于 Error404
的响应来拒绝该数据包
除此之外,您还应该将客户端配置为在一定时间间隔后继续重试发送数据包,并相应地处理需要做的事情,以防通道花费太长时间才能激活并变得可读。手动处理 channelRead 通常不是首选,并且是一种反模式。你应该让 Netty 以最有效的方式为你处理。
非自动读取方法本身不起作用,因为即使只执行了一次 read(),HttpRequestDecoder 也会创建多条消息。
我已经通过使用链式 CompletableFutures 解决了它。
我正在尝试使用 netty 4.1 编写非阻塞代理。我有一个 "FrontHandler" 来处理传入连接,然后是一个 "BackHandler" 来处理传出连接。我正在关注 HexDumpProxyHandler (https://github.com/netty/netty/blob/ed4a89082bb29b9e7d869c5d25d6b9ea8fc9d25b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java#L67)
在这段代码中我发现:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {, I've seen:
意味着仅当出站客户端连接已经准备好时才写入传入消息。这在 HTTP 代理情况下显然不理想,所以我在想什么是处理它的最佳方式。
我想知道在前端连接上禁用自动读取(并且只有在传出客户端连接准备就绪后才手动触发读取)是否是一个不错的选择。然后我可以在后端处理程序的 "channelActive" 事件中再次通过子套接字启用自动读取。但是,我不确定每次 "read()" 调用的处理程序中会收到多少消息(使用 HttpDecoder,我假设我会得到初始的 HttpRequest,但我真的很想避免得到后续的 HttpContent / LastHttpContent 消息,直到我再次手动触发 read() 并通过通道启用自动读取。
另一种选择是使用 Promise 从客户端 ChannelPool 获取 Channel:
private void setCurrentBackend(HttpRequest request) {
pool.acquire(request, backendPromise);
backendPromise.addListener((FutureListener<Channel>) future -> {
Channel c = future.get();
if (!currentBackend.compareAndSet(null, c)) {
pool.release(c);
throw new IllegalStateException();
}
});
}
然后通过该承诺执行从输入到输出的复制。例如:
private void handleLastContent(ChannelHandlerContext frontCtx, LastHttpContent lastContent) {
doInBackend(c -> {
c.writeAndFlush(lastContent).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
future.channel().read();
} else {
pool.release(c);
frontCtx.close();
}
});
});
}
private void doInBackend(Consumer<Channel> action) {
Channel c = currentBackend.get();
if (c == null) {
backendPromise.addListener((FutureListener<Channel>) future -> action.accept(future.get()));
} else {
action.accept(c);
}
}
但我不确定永远保持承诺并通过向其添加侦听器来完成从 "front" 到 "back" 的所有写入有多好。我也不确定如何实例化承诺,以便在正确的线程中执行操作......现在我正在使用:
backendPromise = group.next().<Channel> newPromise(); // bad
// or
backendPromise = frontCtx.channel().eventLoop().newPromise(); // OK?
(其中组与前端 ServerBootstrap 中使用的 eventLoopGroup 相同)。
如果它们没有通过正确的线程处理,我认为在 "doInBackend" 方法中进行 "else { }" 优化以避免使用 Promise 并直接写入通道可能会有问题.
我曾经开发过类似的基于 MQTT 协议的代理应用程序。所以它基本上被用来创建一个实时聊天应用程序。然而,我必须设计的应用程序本质上是异步的,所以我自然不会遇到任何此类问题。因为万一
outboundChannel.isActive() == false
然后我可以简单地将消息保存在队列或持久数据库中,然后在 outboundChannel 启动后处理它们。但是,由于您谈论的是 HTTP 应用程序,所以这意味着该应用程序本质上是同步的,这意味着客户端无法继续发送数据包,直到 outboundChannel 启动并且 运行。因此,您建议的选项是只有在通道处于活动状态时才会读取数据包,您可以通过在 ChannelConfig
中禁用自动读取来手动处理消息读取。
但是,我想建议的是,您应该检查 outboundChannel 是否处于活动状态。如果通道处于活动状态,则将数据包转发以进行处理。如果通道未激活,您应该通过发回类似于 Error404
的响应来拒绝该数据包除此之外,您还应该将客户端配置为在一定时间间隔后继续重试发送数据包,并相应地处理需要做的事情,以防通道花费太长时间才能激活并变得可读。手动处理 channelRead 通常不是首选,并且是一种反模式。你应该让 Netty 以最有效的方式为你处理。
非自动读取方法本身不起作用,因为即使只执行了一次 read(),HttpRequestDecoder 也会创建多条消息。
我已经通过使用链式 CompletableFutures 解决了它。