Netty UDP 性能问题

Netty UDP Performance Issue

我实现了三个小型 UDP 服务器。一个使用普通 Java DatagramSocket(线程),一个使用 Netty,最后一个也使用 Netty,但使用线程消息处理(因为 Netty 不支持 UDP 的多线程)。

经过一些测量后,我得到了以下每秒请求数的结果:

我必须实现的实际应用程序必须处理 > 25.000 requests/second。所以我的问题是,如果我对 Netty 做错了什么,或者 Netty 的设计是否不是为了每秒处理那么多的连接?

实现如下

DatagramSocket 主

public static void main(String... args) throws Exception {
    final int port = Integer.parseInt(args[0]);
    final int threads = Integer.parseInt(args[1]);
    final int work = Integer.parseInt(args[2]);

    DATAGRAM_SOCKET = new DatagramSocket(port);

    for (int i = 0; i < threads; i++) {
        new Thread(new Handler(work)).start();
    }
}

DatagramSocket 处理程序

private static final class Handler implements Runnable {
    private final int work;

    public Handler(int work) throws SocketException {
        this.work = work;
    }

    @Override 
    public void run() {
        try {
            while (!DATAGRAM_SOCKET.isClosed()) {
                final DatagramPacket receivePacket = new DatagramPacket(new byte[1024], 1024);
                DATAGRAM_SOCKET.receive(receivePacket);
                final InetAddress ip = receivePacket.getAddress();
                final int port = receivePacket.getPort();
                final byte[] sendData = "Hey there".getBytes();
                Thread.sleep(RANDOM.nextInt(work));
                final DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, ip, port);
                DATAGRAM_SOCKET.send(sendPacket);
            }
        } catch (Exception e) {
            System.out.println("ERROR: " + e.getMessage());
        }
    }
}

Netty 主

public static void main(String[] args) throws Exception
{
    final int port = Integer.parseInt(args[0]);
    final int sleep = Integer.parseInt(args[1]);

    final Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(new NioEventLoopGroup());
    bootstrap.channel(NioDatagramChannel.class);
    bootstrap.handler(new MyNettyUdpHandler(sleep));
    bootstrap.bind(port).sync().channel().closeFuture().sync();
}

Netty 处理程序(线程)

public class MyNettyUdpHandler extends MessageToMessageDecoder<DatagramPacket> {
    private final Random random = new Random(System.currentTimeMillis());
    private final int sleep;

    public MyNettyUdpHandler(int sleep) {
        this.sleep = sleep;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
        new Thread(() -> {
            try {
                Thread.sleep(random.nextInt(sleep));
            } catch (InterruptedException e) {
                System.out.println("ERROR while sleeping");
            }

            final ByteBuf buffer = Unpooled.buffer(64);
            buffer.writeBytes("Hey there".getBytes());
            channelHandlerContext.channel().writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
        }).start();
    }
}

非线程的Netty Handler是一样的,只是没有线程。

您可以像这样更改您的 Netty decode() 方法,使其等同于 DatagramSocket 代码:

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
  final Channel channel = channelHandlerContext.channel();
  channel.eventLoop().schedule(() -> {
    final ByteBuf buffer = Unpooled.buffer(64);
    buffer.writeBytes("Hey there".getBytes());
    channel.writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
  }, random.nextInt(sleep), TimeUnit.MILLISECONDS);
}

但我猜 sleep() 代码正在模拟您稍后将执行的业务代码。 如果是这种情况,请确保您没有 运行 阻止处理程序中的代码。

编辑:

下面回答你的问题: 你对频道有点困惑。您在 bootstrap 中创建一个管道,然后绑定到某个端口。返回的通道是服务器通道。 handlers 方法中的通道(在你的例子中是你的 decode 方法),就像你在传统套接字编程中 accept() 时获得的套接字。请注意,您从传入的 DatagramPacket 中提取的端口 - 它大致相同。因此,您通过此通道将数据发送回客户端。

我编写的安排响应的代码与您的 DatagramSocket 代码以及您编写的线程 netty 代码的功能相同。 我不确定你为什么这样做,只是假设你有延迟响应的业务需求。 如果不是这种情况,您可以删除计划调用,您的代码将 运行 快得多。 如果您的业务逻辑是非阻塞的,并且 运行s 在几毫秒内,您就完成了。如果它是阻塞的,您需要尝试找到一个非阻塞的替代方案,或者 运行 在执行程序中,即不在事件循环中。

希望这对您有所帮助,即使这不是您最初问题的一部分。 Netty 很棒,我讨厌看到不好的例子和不好的氛围,所以我觉得它值得我花时间 ;)

在每个 decode() 中创建一个线程是低效的。 如果任务简单且不会阻塞,您可以按照Eran所说的将任务提交给channel.eventLoop()(实际上MesaggeToMessageDecoder中的decode()是由频道的EventLoop执行的,所以你不需要手动提交它,除非你想安排它)。 或者您可以将任务提交给 ThreadPoolExecutorEventExecutorGroup。 后者更好,因为您可以向 EventExecutorGroup.submit() 返回的 Future 添加侦听器,这样您就不必等待任务完成。 我的英语很差,希望这些能帮到你。

编辑: 可以这样写,只在EventLoop(即I/O线程中执行简单的逻辑代码):

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List list) throws Exception {
        //do something simple with datagramPacket
        ...

        final ByteBuf buffer = Unpooled.buffer(64);
        buffer.writeBytes("Hey there".getBytes());
        channelHandlerContext.channel().writeAndFlush(new DatagramPacket(buffer, datagramPacket.sender()));
}