具有多个后端服务器的 Netty 代理
Netty Proxy With multiple Backend servers
我正在尝试使用 netty v4.0.30 编写代理服务器。我已经完成了发行版中包含的代理示例 (http://netty.io/4.0/xref/io/netty/example/proxy/package-summary.html)。但是我的要求有点不同。
在我的例子中,我的 netty 实例后面可以有多个服务器,所以我不能直接在 ChannelActive 方法中创建客户端 bootstrap。我的客户实际上是在向我的 netty 服务器发送两个请求(都是 TCP):-
请求 1:- 在端口 X 连接到后端服务器 A。此时我应该能够打开到我的后端服务器的连接并回复成功作为对客户端的响应
Request2:- 客户端在 netty 将转发到后端服务器的同一套接字上写入的实际数据。
由于可以有很多后端服务器,因此这两个请求。由于我仍在尝试学习 netty,因此任何有关相同的提示都会有很大帮助。
提前致谢。
编辑:
这是我的处理程序,它能够连接到第一个请求中提供的多个后端服务器:-
入站渠道处理程序
public class TunnelInboundHandler extends ChannelInboundHandlerAdapter {
// objects for client bootstrap and outbound channel
private Bootstrap b = new Bootstrap()
.group(new NioEventLoopGroup(1))
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,15000)
.option(ChannelOption.SO_SNDBUF, 1048576)
.option(ChannelOption.SO_RCVBUF, 1048576)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// to differentiate between request to connect and actual data
Attribute<Boolean> connected = ctx.attr(isConnected);
// to store outbound channel object
Attribute<Channel> channelContx = ctx.attr(channelContext);
// first request id of format - CONNECT-<IP>-<PORT>
if(connected.get() == null)
{
ByteBuf in = (ByteBuf) msg;
String connectDest = "";
try {
while (in.isReadable()) {
connectDest = connectDest + (char) in.readByte();
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
String[] connectDestArr = connectDest.split("-");
b.channel(ctx.channel().getClass());
b.handler(new NettyTargetHandlerInitilizer(ctx.channel()));
ChannelFuture f = b.connect(connectDestArr[1].trim(), Integer.parseInt(connectDestArr[2].trim()));
outboundChannel = f.channel();
channelContx.set(outboundChannel);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
ctx.channel().read();
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().close();
}
}
});
// response Success to client so that Actual request is sent
String response = "SUCCESS\n";
ByteBuf res = ctx.alloc().buffer(response.length());
res.writeBytes(response.getBytes());
ctx.write(res);
ctx.flush();
// set connected as true to identify first request completion
connected.set(true);
}else if(connected.get()){
if (channelContx.get().isActive()) {
channelContx.get().writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
});
} else {
// System.out.println("Outbound Channel Not Active");
}
}
}
}
出站通道处理程序
public OutBoundTargetHandler(Channel inboundChannel) {
// System.out.println("Initlizing target pool");
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("Activating Chanel");
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("Receving data");
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (inboundChannel.isActive()) {
inboundChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
一切都按预期工作,唯一的问题是我的客户端 bootstrap 在请求完成后没有关闭。因此,对于每个请求,我的线程数都会增加一个。有什么提示吗?
我认为您需要全局创建一个 ClientBootstrap,然后在您的服务器处理程序的读取方法中使用它,以便它创建到远程选择服务器的连接。
ClientBootstrap 在定义时不需要特定的远程服务器,但需要连接时间。
如有必要,您可能需要 add/remove 为所用服务器创建的 channelHandler 中的一些处理程序。为此,一旦客户端通道处于活动状态,您就可以根据需要执行必要的操作,例如根据远程地址进行选择。
然后您可以安全地继续并代理您的数据包。
编辑:
您可以使用 attr(key).set(primaryContext or primaryChannel)
将信息附加到频道或上下文,以便您转发信息。所以像:
static final AttributeKey<Channel> PRIMARY_CHANNEL =
AttributeKey.valueOf(YourHandler.class.getName() + ".PRIMARY_CHANNEL");
ChannelFuture future = bootstrap.connect(destination);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-connection operation
if (future.isSuccess()) {
future.channel.attr(PRIMARY_CHANNEL).set(primaryChannel);
// primaryChannel being the first channel in the proxy chain
} else {
// inform error on connection back to requester
primaryCtx.writeAndFluxh(error);
}
}
});
然后在第二个通道处理程序中执行您需要对此主通道执行的操作...
我正在尝试使用 netty v4.0.30 编写代理服务器。我已经完成了发行版中包含的代理示例 (http://netty.io/4.0/xref/io/netty/example/proxy/package-summary.html)。但是我的要求有点不同。
在我的例子中,我的 netty 实例后面可以有多个服务器,所以我不能直接在 ChannelActive 方法中创建客户端 bootstrap。我的客户实际上是在向我的 netty 服务器发送两个请求(都是 TCP):-
请求 1:- 在端口 X 连接到后端服务器 A。此时我应该能够打开到我的后端服务器的连接并回复成功作为对客户端的响应
Request2:- 客户端在 netty 将转发到后端服务器的同一套接字上写入的实际数据。
由于可以有很多后端服务器,因此这两个请求。由于我仍在尝试学习 netty,因此任何有关相同的提示都会有很大帮助。
提前致谢。
编辑:
这是我的处理程序,它能够连接到第一个请求中提供的多个后端服务器:-
入站渠道处理程序
public class TunnelInboundHandler extends ChannelInboundHandlerAdapter {
// objects for client bootstrap and outbound channel
private Bootstrap b = new Bootstrap()
.group(new NioEventLoopGroup(1))
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,15000)
.option(ChannelOption.SO_SNDBUF, 1048576)
.option(ChannelOption.SO_RCVBUF, 1048576)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// to differentiate between request to connect and actual data
Attribute<Boolean> connected = ctx.attr(isConnected);
// to store outbound channel object
Attribute<Channel> channelContx = ctx.attr(channelContext);
// first request id of format - CONNECT-<IP>-<PORT>
if(connected.get() == null)
{
ByteBuf in = (ByteBuf) msg;
String connectDest = "";
try {
while (in.isReadable()) {
connectDest = connectDest + (char) in.readByte();
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
String[] connectDestArr = connectDest.split("-");
b.channel(ctx.channel().getClass());
b.handler(new NettyTargetHandlerInitilizer(ctx.channel()));
ChannelFuture f = b.connect(connectDestArr[1].trim(), Integer.parseInt(connectDestArr[2].trim()));
outboundChannel = f.channel();
channelContx.set(outboundChannel);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
ctx.channel().read();
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().close();
}
}
});
// response Success to client so that Actual request is sent
String response = "SUCCESS\n";
ByteBuf res = ctx.alloc().buffer(response.length());
res.writeBytes(response.getBytes());
ctx.write(res);
ctx.flush();
// set connected as true to identify first request completion
connected.set(true);
}else if(connected.get()){
if (channelContx.get().isActive()) {
channelContx.get().writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
});
} else {
// System.out.println("Outbound Channel Not Active");
}
}
}
}
出站通道处理程序
public OutBoundTargetHandler(Channel inboundChannel) {
// System.out.println("Initlizing target pool");
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("Activating Chanel");
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("Receving data");
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (inboundChannel.isActive()) {
inboundChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
一切都按预期工作,唯一的问题是我的客户端 bootstrap 在请求完成后没有关闭。因此,对于每个请求,我的线程数都会增加一个。有什么提示吗?
我认为您需要全局创建一个 ClientBootstrap,然后在您的服务器处理程序的读取方法中使用它,以便它创建到远程选择服务器的连接。
ClientBootstrap 在定义时不需要特定的远程服务器,但需要连接时间。
如有必要,您可能需要 add/remove 为所用服务器创建的 channelHandler 中的一些处理程序。为此,一旦客户端通道处于活动状态,您就可以根据需要执行必要的操作,例如根据远程地址进行选择。
然后您可以安全地继续并代理您的数据包。
编辑:
您可以使用 attr(key).set(primaryContext or primaryChannel)
将信息附加到频道或上下文,以便您转发信息。所以像:
static final AttributeKey<Channel> PRIMARY_CHANNEL =
AttributeKey.valueOf(YourHandler.class.getName() + ".PRIMARY_CHANNEL");
ChannelFuture future = bootstrap.connect(destination);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-connection operation
if (future.isSuccess()) {
future.channel.attr(PRIMARY_CHANNEL).set(primaryChannel);
// primaryChannel being the first channel in the proxy chain
} else {
// inform error on connection back to requester
primaryCtx.writeAndFluxh(error);
}
}
});
然后在第二个通道处理程序中执行您需要对此主通道执行的操作...