Netty UDP 性能问题
Netty UDP Performance Issue
我实现了三个小型 UDP 服务器。一个使用普通 Java DatagramSocket(线程),一个使用 Netty,最后一个也使用 Netty,但使用线程消息处理(因为 Netty 不支持 UDP 的多线程)。
经过一些测量后,我得到了以下每秒请求数的结果:
- DatagramSocket ~30.000 requests/second
- Netty ~1.500 requests/second
- Netty(线程):~8.000 requests/second
我必须实现的实际应用程序必须处理 > 25.000 requests/second。所以我的问题是,如果我对 Netty 做错了什么,或者 Netty 的设计是否不是为了每秒处理那么多的连接?
实现如下
DatagramSocket 主
public static void main(String... args) throws Exception {
final int port = Integer.parseInt(args[0]);
final int threads = Integer.parseInt(args[1]);
final int work = Integer.parseInt(args[2]);
DATAGRAM_SOCKET = new DatagramSocket(port);
for (int i = 0; i < threads; i++) {
new Thread(new Handler(work)).start();
}
}
DatagramSocket 处理程序
private static final class Handler implements Runnable {
private final int work;
public Handler(int work) throws SocketException {
this.work = work;
}
@Override
public void run() {
try {
while (!DATAGRAM_SOCKET.isClosed()) {
final DatagramPacket receivePacket = new DatagramPacket(new byte[1024], 1024);
DATAGRAM_SOCKET.receive(receivePacket);
final InetAddress ip = receivePacket.getAddress();
final int port = receivePacket.getPort();
final byte[] sendData = "Hey there".getBytes();
Thread.sleep(RANDOM.nextInt(work));
final DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ip, port);
DATAGRAM_SOCKET.send(sendPacket);
}
} catch (Exception e) {
System.out.println("ERROR: " + e.getMessage());
}
}
}
Netty 主
public static void main(String[] args) throws Exception
{
final int port = Integer.parseInt(args[0]);
final int sleep = Integer.parseInt(args[1]);
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup());
bootstrap.channel(NioDatagramChannel.class);
bootstrap.handler(new MyNettyUdpHandler(sleep));
bootstrap.bind(port).sync().channel().closeFuture().sync();
}
Netty 处理程序(线程)
public class MyNettyUdpHandler extends MessageToMessageDecoder<DatagramPacket> {
private final Random random = new Random(System.currentTimeMillis());
private final int sleep;
public MyNettyUdpHandler(int sleep) {
this.sleep = sleep;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
new Thread(() -> {
try {
Thread.sleep(random.nextInt(sleep));
} catch (InterruptedException e) {
System.out.println("ERROR while sleeping");
}
final ByteBuf buffer = Unpooled.buffer(64);
buffer.writeBytes("Hey there".getBytes());
channelHandlerContext.channel().writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}).start();
}
}
非线程的Netty Handler是一样的,只是没有线程。
您可以像这样更改您的 Netty decode() 方法,使其等同于 DatagramSocket 代码:
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
final Channel channel = channelHandlerContext.channel();
channel.eventLoop().schedule(() -> {
final ByteBuf buffer = Unpooled.buffer(64);
buffer.writeBytes("Hey there".getBytes());
channel.writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}, random.nextInt(sleep), TimeUnit.MILLISECONDS);
}
但我猜 sleep() 代码正在模拟您稍后将执行的业务代码。
如果是这种情况,请确保您没有 运行 阻止处理程序中的代码。
编辑:
下面回答你的问题:
你对频道有点困惑。您在 bootstrap 中创建一个管道,然后绑定到某个端口。返回的通道是服务器通道。 handlers 方法中的通道(在你的例子中是你的 decode 方法),就像你在传统套接字编程中 accept()
时获得的套接字。请注意,您从传入的 DatagramPacket 中提取的端口 - 它大致相同。因此,您通过此通道将数据发送回客户端。
我编写的安排响应的代码与您的 DatagramSocket 代码以及您编写的线程 netty 代码的功能相同。
我不确定你为什么这样做,只是假设你有延迟响应的业务需求。
如果不是这种情况,您可以删除计划调用,您的代码将 运行 快得多。
如果您的业务逻辑是非阻塞的,并且 运行s 在几毫秒内,您就完成了。如果它是阻塞的,您需要尝试找到一个非阻塞的替代方案,或者 运行 在执行程序中,即不在事件循环中。
希望这对您有所帮助,即使这不是您最初问题的一部分。 Netty 很棒,我讨厌看到不好的例子和不好的氛围,所以我觉得它值得我花时间 ;)
在每个 decode() 中创建一个线程是低效的。
如果任务简单且不会阻塞,您可以按照Eran所说的将任务提交给channel.eventLoop()
(实际上MesaggeToMessageDecoder
中的decode()
是由频道的EventLoop
执行的,所以你不需要手动提交它,除非你想安排它)。
或者您可以将任务提交给 ThreadPoolExecutor
或 EventExecutorGroup
。
后者更好,因为您可以向 EventExecutorGroup.submit()
返回的 Future
添加侦听器,这样您就不必等待任务完成。
我的英语很差,希望这些能帮到你。
编辑:
可以这样写,只在EventLoop
(即I/O线程中执行简单的逻辑代码):
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
//do something simple with datagramPacket
...
final ByteBuf buffer = Unpooled.buffer(64);
buffer.writeBytes("Hey there".getBytes());
channelHandlerContext.channel().writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}
我实现了三个小型 UDP 服务器。一个使用普通 Java DatagramSocket(线程),一个使用 Netty,最后一个也使用 Netty,但使用线程消息处理(因为 Netty 不支持 UDP 的多线程)。
经过一些测量后,我得到了以下每秒请求数的结果:
- DatagramSocket ~30.000 requests/second
- Netty ~1.500 requests/second
- Netty(线程):~8.000 requests/second
我必须实现的实际应用程序必须处理 > 25.000 requests/second。所以我的问题是,如果我对 Netty 做错了什么,或者 Netty 的设计是否不是为了每秒处理那么多的连接?
实现如下
DatagramSocket 主
public static void main(String... args) throws Exception {
final int port = Integer.parseInt(args[0]);
final int threads = Integer.parseInt(args[1]);
final int work = Integer.parseInt(args[2]);
DATAGRAM_SOCKET = new DatagramSocket(port);
for (int i = 0; i < threads; i++) {
new Thread(new Handler(work)).start();
}
}
DatagramSocket 处理程序
private static final class Handler implements Runnable {
private final int work;
public Handler(int work) throws SocketException {
this.work = work;
}
@Override
public void run() {
try {
while (!DATAGRAM_SOCKET.isClosed()) {
final DatagramPacket receivePacket = new DatagramPacket(new byte[1024], 1024);
DATAGRAM_SOCKET.receive(receivePacket);
final InetAddress ip = receivePacket.getAddress();
final int port = receivePacket.getPort();
final byte[] sendData = "Hey there".getBytes();
Thread.sleep(RANDOM.nextInt(work));
final DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ip, port);
DATAGRAM_SOCKET.send(sendPacket);
}
} catch (Exception e) {
System.out.println("ERROR: " + e.getMessage());
}
}
}
Netty 主
public static void main(String[] args) throws Exception
{
final int port = Integer.parseInt(args[0]);
final int sleep = Integer.parseInt(args[1]);
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup());
bootstrap.channel(NioDatagramChannel.class);
bootstrap.handler(new MyNettyUdpHandler(sleep));
bootstrap.bind(port).sync().channel().closeFuture().sync();
}
Netty 处理程序(线程)
public class MyNettyUdpHandler extends MessageToMessageDecoder<DatagramPacket> {
private final Random random = new Random(System.currentTimeMillis());
private final int sleep;
public MyNettyUdpHandler(int sleep) {
this.sleep = sleep;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
new Thread(() -> {
try {
Thread.sleep(random.nextInt(sleep));
} catch (InterruptedException e) {
System.out.println("ERROR while sleeping");
}
final ByteBuf buffer = Unpooled.buffer(64);
buffer.writeBytes("Hey there".getBytes());
channelHandlerContext.channel().writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}).start();
}
}
非线程的Netty Handler是一样的,只是没有线程。
您可以像这样更改您的 Netty decode() 方法,使其等同于 DatagramSocket 代码:
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
final Channel channel = channelHandlerContext.channel();
channel.eventLoop().schedule(() -> {
final ByteBuf buffer = Unpooled.buffer(64);
buffer.writeBytes("Hey there".getBytes());
channel.writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}, random.nextInt(sleep), TimeUnit.MILLISECONDS);
}
但我猜 sleep() 代码正在模拟您稍后将执行的业务代码。 如果是这种情况,请确保您没有 运行 阻止处理程序中的代码。
编辑:
下面回答你的问题:
你对频道有点困惑。您在 bootstrap 中创建一个管道,然后绑定到某个端口。返回的通道是服务器通道。 handlers 方法中的通道(在你的例子中是你的 decode 方法),就像你在传统套接字编程中 accept()
时获得的套接字。请注意,您从传入的 DatagramPacket 中提取的端口 - 它大致相同。因此,您通过此通道将数据发送回客户端。
我编写的安排响应的代码与您的 DatagramSocket 代码以及您编写的线程 netty 代码的功能相同。 我不确定你为什么这样做,只是假设你有延迟响应的业务需求。 如果不是这种情况,您可以删除计划调用,您的代码将 运行 快得多。 如果您的业务逻辑是非阻塞的,并且 运行s 在几毫秒内,您就完成了。如果它是阻塞的,您需要尝试找到一个非阻塞的替代方案,或者 运行 在执行程序中,即不在事件循环中。
希望这对您有所帮助,即使这不是您最初问题的一部分。 Netty 很棒,我讨厌看到不好的例子和不好的氛围,所以我觉得它值得我花时间 ;)
在每个 decode() 中创建一个线程是低效的。
如果任务简单且不会阻塞,您可以按照Eran所说的将任务提交给channel.eventLoop()
(实际上MesaggeToMessageDecoder
中的decode()
是由频道的EventLoop
执行的,所以你不需要手动提交它,除非你想安排它)。
或者您可以将任务提交给 ThreadPoolExecutor
或 EventExecutorGroup
。
后者更好,因为您可以向 EventExecutorGroup.submit()
返回的 Future
添加侦听器,这样您就不必等待任务完成。
我的英语很差,希望这些能帮到你。
编辑:
可以这样写,只在EventLoop
(即I/O线程中执行简单的逻辑代码):
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
//do something simple with datagramPacket
...
final ByteBuf buffer = Unpooled.buffer(64);
buffer.writeBytes("Hey there".getBytes());
channelHandlerContext.channel().writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}