带 Netty 的多线程 UDP 服务器

Multi threaded UDP server with Netty

我正在尝试使用 Netty 实现 UDP 服务器。这个想法是只绑定一次(因此只创建一个 Channel)。此 Channel 仅使用一个处理程序进行初始化,该处理程序通过 ExecutorService.

在多个线程之间分派传入数据报的处理。
@Configuration
public class SpringConfig {

    @Autowired
    private Dispatcher dispatcher;

    private String host;

    private int port;

    @Bean
    public Bootstrap bootstrap() throws Exception {
        Bootstrap bootstrap = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(dispatcher);

        ChannelFuture future = bootstrap.bind(host, port).await();
        if(!future.isSuccess())
            throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());

        return bootstrap;
    }
}

@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {

    private int workerThreads;

    private ExecutorService executorService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DatagramPacket packet = (DatagramPacket) msg;

        final Channel channel = ctx.channel();

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //Process the packet and produce a response packet (below)              
                DatagramPacket responsePacket = ...;

                ChannelFuture future;
                try {
                    future = channel.writeAndFlush(responsePacket).await();
                } catch (InterruptedException e) {
                    return;
                }
                if(!future.isSuccess())
                    log.warn("Failed to write response packet.");
            }
        });
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        executorService = Executors.newFixedThreadPool(workerThreads);
    }
}

我有以下问题:

  1. Dispatcherclass的channelRead方法接收到的DatagramPacket在被工作线程使用之前是否应该复制?我想知道这个数据包是否在 channelRead 方法 returns 之后被销毁,即使工作线程保留了一个引用。
  2. 在所有工作线程之间共享 Channel 并让它们同时调用 writeAndFlush 是否安全?

谢谢!

  1. 没有。如果你需要这个对象活得更久,你要么把它变成别的东西,要么使用 ReferenceCountUtil.retain(datagram) ,然后在你完成它后使用 ReferenceCountUtil.release(datagram) 。你也不应该在执行者服务上做 await(),你应该为发生的任何事情注册一个处理程序。

  2. 是的,通道对象是线程安全的,它们可以从许多不同的线程调用。