为什么使用netty循环writeAndFlush发送DatagramPacket必须让线程休眠一段时间?

why use netty loop writeAndFlush to send DatagramPacket must sleep the thread for a while?

有个问题困扰了我很久。 那就是当我使用 netty 循环 writeAndFlush 将 DatagramPacket 发送到我的 udp 服务器时,大多数消息都丢失了。 但如果我让线程休眠一段时间,所有的消息都会被传送。 像这样:

public static void main(String[] args) throws InterruptedException {
    int count = 3000;
    AtomicInteger integer = new AtomicInteger(count);
    AtomicInteger countInteger = new AtomicInteger();
    Bootstrap bootstrap = new Bootstrap();
    EventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture channelFuture = bootstrap.group(group)
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.SO_BROADCAST, true)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
            .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
            .handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new WakeupMessageEncoder())
                            .addLast(new WakeupMessageReplyDecoder())
                            .addLast(new SimpleChannelInboundHandler<WakeupMessageReply>() {

                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, WakeupMessageReply reply) throws Exception {
                                    countInteger.getAndIncrement();
                                    System.out.println(reply);
                                }
                            })
                            ;
                }
            }).bind(0).sync();
    Channel channel = channelFuture.channel();
    long s = System.currentTimeMillis();
    for (int i = 0; i < count; i++) {
        //TimeUnit.NANOSECONDS.sleep(1);
        WakeupMessage message = new WakeupMessage(UUID.randomUUID().toString(), "192.168.0.3:12000", "89860918700328360182", "test message" + i);
        channel.writeAndFlush(message).addListener(f -> integer.decrementAndGet());
    }
    while (true) {
        if (integer.get() <= 0) {
            break;
        }
    }
    try {
        channel.closeFuture();
        System.out.println("done:" + (System.currentTimeMillis() - s) + "ms");
        System.out.println(countInteger);
    }finally {
        group.shutdownGracefully();
    }
} 

我发送了 3000 条消息,但只收到很少... 像这样: 1744 messages 但是如果我像这样休眠线程:

for (int i = 0; i < count; i++) {
    TimeUnit.NANOSECONDS.sleep(1);
    WakeupMessage message = new WakeupMessage(UUID.randomUUID().toString(), "192.168.0.3:12000", "89860918700328360182", "test message" + i);
    channel.writeAndFlush(message).addListener(f -> integer.decrementAndGet());
}

我将从udp服务器接收所有重播。

那么为什么我必须休眠线程???

数据报数据包 (UDP) 无法保证送达,如果发送速度过快可能会被丢弃。有很多可能的原因,在您的情况下,您很可能填满了发送缓冲区或接收缓冲区或两者。 This 文章更详细。

另一种选择是 sync() 写入和刷新时的侦听器,即

channel.writeAndFlush(message).addListener(f -> integer.decrementAndGet()).sync();

在我的测试中,这获得了所有 3000 个响应,所用时间比睡眠时间短得多。对于我笔记本电脑上的本地 UDP 回显服务器:

  • 睡眠总耗时:3,758 毫秒
  • 同步总耗时:407 毫秒

但是,正如@ewramner 所指出的,这两个选项都不能保证您将获得所有 3000 个回复。