Netty ChannelInboundHandlerAdapter async/multithreading

Netty ChannelInboundHandlerAdapter async/multithreading

我无法理解 netty 中多线程背后的概念,EventLoopGroup (MultithreadEventLoopGroup), MultithreadEventExecutorGroup, DefaultEventExecutorGroup

我试图了解服务器如何处理多个客户端同时发送的请求,这些请求将执行一些添加到 RTT 的业务逻辑和 CRUD 操作。下面是我的 netty 服务器代码,它可以工作,但我试图准确理解它如何与并发用户和多个开放通道一起工作。

我有一个简单的ServerBootstrap

@Component
@RequiredArgsConstructor
public class SocketServer {

    private final ContextAwareLogger logger;
    private final ServerInitializer serverInitializer;
    private final NioEventLoopGroup bossGroup;
    private final NioEventLoopGroup workerGroup;

    private Channel mainChannel;

    @PostConstruct
    public void start() {
        try {
            ServerBootstrap bootstrap = init();
            mainChannel = bootstrap.bind(8484).sync().channel(); // save the main channel so we can cleanly close it when app is shutdown
            logger.info("Netty server started...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void stop() throws InterruptedException {
        logger.info("Shutting down Netty server");
        bossGroup.shutdownGracefully().sync();
        workerGroup.shutdownGracefully().sync();
        mainChannel.closeFuture().sync();
        logger.info("Netty Server shutdown complete.");
    }

    private ServerBootstrap init() {
        return new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 5000)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(serverInitializer);
    }

}

ChannelInitializer:

@Component
@RequiredArgsConstructor
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    private final PacketDecoder packetDecoder;
    private final ServerHandler serverHandler;
    private final PacketEncoder packetEncoder;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline()
            .addLast("decoder", packetDecoder) // ByteArrayDecoder
            .addLast("encoder", packetEncoder) // ByteArrayEncoder
            .addLast("inbound", serverHandler); // ChannelInboundHandlerAdapter
    }

}

ChannelInboundHandlerAdapter:

@Component
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    private SomeService someService;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

       // contains db access
       byte[] accept = someService.validateClient(ctx.channel());

       ctx.channel().writeAndFlush(accept);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

       // may contain db access
       byte[] response = someService.processPacket(ctx.channel(), msg));

       ctx.channel().writeAndFlush(response);
    }
}

现在,当客户端连接时,我了解到将打开一个新的 Channel 并重用处理程序。要求是每个客户端 request/response 需要立即处理,而不是等待其他客户端的 CRUD 操作完成。

我的 channelRead 和 channelActive 等是否是异步的,因为我正在使用 NioEventLoopGroup(即每个客户端的通道操作是否会 运行 彼此独立)?

如果单个客户端连续发送多个请求,是否保证它们以相同的顺序处理?

我需要为入站处理程序指定 DefaultEventExecutorGroup 吗? ()

您需要为您的 ServerHandler 使用 DefaultEventExecutorGroup 或将 validateClient(...) / processPacket(...) 分派到您自己的线程池。如果不这样做将导致 EventLoop 线程阻塞,因此在阻塞操作完成之前无法为此 EventLoop 处理其他 IO。