是否可以在全双工 TCP 通信中使用 Netty?
Is it possible to use Netty in full duplex TCP communication?
Netty 似乎只能通过单个 TCP 连接处理读取或写入操作,但不能同时处理两者。我有一个连接到用 Netty 编写的回显服务器应用程序并发送大约 200k 消息的客户端。
回显服务器只接受客户端连接并发回客户端发送的任何消息。
问题是我无法让 Netty 在全双工模式下使用 TCP 连接。我想在服务器端同时处理读写操作。在我的例子中,Netty 从客户端读取所有消息,然后将它们发回,这导致了高延迟。
客户端应用程序为每个连接触发两个线程。一个用于任何写操作,另一个用于读操作。是的,客户端是以普通的旧 Java IO 风格编写的。
也许问题与我在服务器端设置的 TCP 选项有关:
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, bufferWatermarkHigh)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, bufferWatermarkLow)
.childOption(ChannelOption.SO_RCVBUF, bufferInSize)
.childOption(ChannelOption.SO_SNDBUF, bufferOutSize)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
在您使用 github 存储库提供的示例中,有多个错误:
你直接从channelActive
方法写
在netty中,有一个策略是每个handler同时只执行一个传入的方法,这是为了让开发更简单,同时保证class的方法被正确执行为了确保方法的副作用在其他 classes.
中可见
- 您正在打印
channelReadComplete
中的消息
channelReadComplete
在清除当前消息缓冲区后调用,channelRead
在调用前可能会调用多次
- 缺少成帧器
构建消息或计算消息的大小是检测内部有多少字节的方法。如果没有这个成帧器,客户端的 2 次写入可能是服务器的 1 次读取,作为测试,我使用了 new io.netty.handler.codec.FixedLengthFrameDecoder(120)
,所以我可以使用 i++
计算到达服务器和客户端的消息数量。
**使用重量级打印进行轻量级操作。
根据我的分析器,大部分时间都花在调用 LOG.info()
上,记录器通常就是这种情况,因为它们在幕后做很多事情,比如输出流的同步。通过让记录器只记录每 1000 条消息,我得到了巨大的速度提升(而且我的计算机非常慢,因为我 运行 双核...)
重发码
发送代码每次都会重新创建 ByteBuf
。通过重用 ByteBuf
可以进一步提高发送速度,您可以通过创建 ByteBuf 1 次,然后在每次传递它时调用 .retain()
来实现。
这很容易完成:
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.writeAndFlush(buf.retain());
}
减少刷新量
通过减少刷新量,可以获得更高的本机性能。每次调用 flush() 都是调用网络堆栈以发送未决消息。如果我们将该规则应用于上面的代码,它将给出以下代码:
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.write(buf.retain());
}
ctx.flush();
最终代码
有时候,您只想看到结果并亲自尝试一下:
App.java(不变)
public class App {
public static void main( String[] args ) throws InterruptedException {
final int PORT = 8080;
runInSeparateThread(() -> new Server(PORT));
runInSeparateThread(() -> new Client(PORT));
}
private static void runInSeparateThread(Runnable runnable) {
new Thread(runnable).start();
}
}
Client.java
public class Client {
public Client(int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = createBootstrap(group).connect("192.168.171.102", port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private Bootstrap createBootstrap(EventLoopGroup group) {
return new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
ch.pipeline().addLast(new ClientHandler());
}
}
);
}
}
ClientHandler.java
public class ClientHandler extends ChannelInboundHandlerAdapter {
private final Logger LOG = LoggerFactory.getLogger(ClientHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final int MESSAGE_SIZE = 200;
final int NUMBER_OF_MESSAGES = 200000;
new Thread(()->{
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.writeAndFlush(buf.retain());
}}).start();
}
int i;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(i++%10000==0)
LOG.info("Got a message back from the server "+(i));
((io.netty.util.ReferenceCounted)msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private ByteBuf createMessage(int size) {
ByteBuf message = Unpooled.buffer(size);
for (int i = 0; i < size; ++i) {
message.writeByte((byte) i);
}
return message;
}
}
Server.java
public class Server {
public Server(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = createServerBootstrap(bossGroup, workerGroup).bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private ServerBootstrap createServerBootstrap(EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
return new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
ch.pipeline().addLast(new ServerHandler());
}
});
}
}
ServerHandler.java
public class ServerHandler extends ChannelInboundHandlerAdapter {
private final Logger LOG = LoggerFactory.getLogger(ServerHandler.class.getSimpleName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg).addListener(f->{if(f.cause()!=null)LOG.info(f.cause().toString());});
if(i++%10000==0)
LOG.info("Send the message back to the client "+(i));
;
}
int i;
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// LOG.info("Send the message back to the client "+(i++));
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
测试结果
我决定测试如果我更改记录的 incmoing 消息的频率会发生什么,这些是测试结果:
What to print: max message latency time taken*
(always) > 20000 >10 min
i++ % 10 == 0 > 20000 >10 min
i++ % 100 == 0 16000 4 min
i++ % 1000 == 0 0-3000 51 sec
i++ % 10000 == 0 <10000 22 sec
* 时间应该有所保留,没有真正的基准测试,只有 1 个程序的快速拍摄 运行
这表明通过减少对日志的调用量(精度),我们可以获得更好的传输率(速度)。
Netty 似乎只能通过单个 TCP 连接处理读取或写入操作,但不能同时处理两者。我有一个连接到用 Netty 编写的回显服务器应用程序并发送大约 200k 消息的客户端。
回显服务器只接受客户端连接并发回客户端发送的任何消息。
问题是我无法让 Netty 在全双工模式下使用 TCP 连接。我想在服务器端同时处理读写操作。在我的例子中,Netty 从客户端读取所有消息,然后将它们发回,这导致了高延迟。
客户端应用程序为每个连接触发两个线程。一个用于任何写操作,另一个用于读操作。是的,客户端是以普通的旧 Java IO 风格编写的。
也许问题与我在服务器端设置的 TCP 选项有关:
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, bufferWatermarkHigh)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, bufferWatermarkLow)
.childOption(ChannelOption.SO_RCVBUF, bufferInSize)
.childOption(ChannelOption.SO_SNDBUF, bufferOutSize)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
在您使用 github 存储库提供的示例中,有多个错误:
你直接从
channelActive
方法写在netty中,有一个策略是每个handler同时只执行一个传入的方法,这是为了让开发更简单,同时保证class的方法被正确执行为了确保方法的副作用在其他 classes.
中可见- 您正在打印
channelReadComplete
中的消息
channelReadComplete
在清除当前消息缓冲区后调用,channelRead
在调用前可能会调用多次- 缺少成帧器
构建消息或计算消息的大小是检测内部有多少字节的方法。如果没有这个成帧器,客户端的 2 次写入可能是服务器的 1 次读取,作为测试,我使用了
new io.netty.handler.codec.FixedLengthFrameDecoder(120)
,所以我可以使用i++
计算到达服务器和客户端的消息数量。- 您正在打印
**使用重量级打印进行轻量级操作。
根据我的分析器,大部分时间都花在调用
LOG.info()
上,记录器通常就是这种情况,因为它们在幕后做很多事情,比如输出流的同步。通过让记录器只记录每 1000 条消息,我得到了巨大的速度提升(而且我的计算机非常慢,因为我 运行 双核...)重发码
发送代码每次都会重新创建
ByteBuf
。通过重用ByteBuf
可以进一步提高发送速度,您可以通过创建 ByteBuf 1 次,然后在每次传递它时调用.retain()
来实现。这很容易完成:
ByteBuf buf = createMessage(MESSAGE_SIZE); for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) { ctx.writeAndFlush(buf.retain()); }
减少刷新量
通过减少刷新量,可以获得更高的本机性能。每次调用 flush() 都是调用网络堆栈以发送未决消息。如果我们将该规则应用于上面的代码,它将给出以下代码:
ByteBuf buf = createMessage(MESSAGE_SIZE); for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) { ctx.write(buf.retain()); } ctx.flush();
最终代码
有时候,您只想看到结果并亲自尝试一下:
App.java(不变)
public class App {
public static void main( String[] args ) throws InterruptedException {
final int PORT = 8080;
runInSeparateThread(() -> new Server(PORT));
runInSeparateThread(() -> new Client(PORT));
}
private static void runInSeparateThread(Runnable runnable) {
new Thread(runnable).start();
}
}
Client.java
public class Client {
public Client(int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = createBootstrap(group).connect("192.168.171.102", port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private Bootstrap createBootstrap(EventLoopGroup group) {
return new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
ch.pipeline().addLast(new ClientHandler());
}
}
);
}
}
ClientHandler.java
public class ClientHandler extends ChannelInboundHandlerAdapter {
private final Logger LOG = LoggerFactory.getLogger(ClientHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final int MESSAGE_SIZE = 200;
final int NUMBER_OF_MESSAGES = 200000;
new Thread(()->{
ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
ctx.writeAndFlush(buf.retain());
}}).start();
}
int i;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(i++%10000==0)
LOG.info("Got a message back from the server "+(i));
((io.netty.util.ReferenceCounted)msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private ByteBuf createMessage(int size) {
ByteBuf message = Unpooled.buffer(size);
for (int i = 0; i < size; ++i) {
message.writeByte((byte) i);
}
return message;
}
}
Server.java
public class Server {
public Server(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = createServerBootstrap(bossGroup, workerGroup).bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private ServerBootstrap createServerBootstrap(EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
return new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
ch.pipeline().addLast(new ServerHandler());
}
});
}
}
ServerHandler.java
public class ServerHandler extends ChannelInboundHandlerAdapter {
private final Logger LOG = LoggerFactory.getLogger(ServerHandler.class.getSimpleName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg).addListener(f->{if(f.cause()!=null)LOG.info(f.cause().toString());});
if(i++%10000==0)
LOG.info("Send the message back to the client "+(i));
;
}
int i;
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// LOG.info("Send the message back to the client "+(i++));
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
测试结果
我决定测试如果我更改记录的 incmoing 消息的频率会发生什么,这些是测试结果:
What to print: max message latency time taken*
(always) > 20000 >10 min
i++ % 10 == 0 > 20000 >10 min
i++ % 100 == 0 16000 4 min
i++ % 1000 == 0 0-3000 51 sec
i++ % 10000 == 0 <10000 22 sec
* 时间应该有所保留,没有真正的基准测试,只有 1 个程序的快速拍摄 运行
这表明通过减少对日志的调用量(精度),我们可以获得更好的传输率(速度)。