Netty ChannelInboundHandlerAdapter async/multithreading
Netty ChannelInboundHandlerAdapter async/multithreading
我无法理解 netty 中多线程背后的概念,EventLoopGroup
(MultithreadEventLoopGroup
), MultithreadEventExecutorGroup
, DefaultEventExecutorGroup
我试图了解服务器如何处理多个客户端同时发送的请求,这些请求将执行一些添加到 RTT 的业务逻辑和 CRUD 操作。下面是我的 netty 服务器代码,它可以工作,但我试图准确理解它如何与并发用户和多个开放通道一起工作。
我有一个简单的ServerBootstrap
@Component
@RequiredArgsConstructor
public class SocketServer {
private final ContextAwareLogger logger;
private final ServerInitializer serverInitializer;
private final NioEventLoopGroup bossGroup;
private final NioEventLoopGroup workerGroup;
private Channel mainChannel;
@PostConstruct
public void start() {
try {
ServerBootstrap bootstrap = init();
mainChannel = bootstrap.bind(8484).sync().channel(); // save the main channel so we can cleanly close it when app is shutdown
logger.info("Netty server started...");
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void stop() throws InterruptedException {
logger.info("Shutting down Netty server");
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
mainChannel.closeFuture().sync();
logger.info("Netty Server shutdown complete.");
}
private ServerBootstrap init() {
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 5000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(serverInitializer);
}
}
ChannelInitializer:
@Component
@RequiredArgsConstructor
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
private final PacketDecoder packetDecoder;
private final ServerHandler serverHandler;
private final PacketEncoder packetEncoder;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("decoder", packetDecoder) // ByteArrayDecoder
.addLast("encoder", packetEncoder) // ByteArrayEncoder
.addLast("inbound", serverHandler); // ChannelInboundHandlerAdapter
}
}
ChannelInboundHandlerAdapter:
@Component
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Autowired
private SomeService someService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// contains db access
byte[] accept = someService.validateClient(ctx.channel());
ctx.channel().writeAndFlush(accept);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// may contain db access
byte[] response = someService.processPacket(ctx.channel(), msg));
ctx.channel().writeAndFlush(response);
}
}
现在,当客户端连接时,我了解到将打开一个新的 Channel 并重用处理程序。要求是每个客户端 request/response 需要立即处理,而不是等待其他客户端的 CRUD 操作完成。
我的 channelRead 和 channelActive 等是否是异步的,因为我正在使用 NioEventLoopGroup
(即每个客户端的通道操作是否会 运行 彼此独立)?
如果单个客户端连续发送多个请求,是否保证它们以相同的顺序处理?
我需要为入站处理程序指定 DefaultEventExecutorGroup
吗? ()
您需要为您的 ServerHandler
使用 DefaultEventExecutorGroup
或将 validateClient(...)
/ processPacket(...)
分派到您自己的线程池。如果不这样做将导致 EventLoop
线程阻塞,因此在阻塞操作完成之前无法为此 EventLoop
处理其他 IO。
我无法理解 netty 中多线程背后的概念,EventLoopGroup
(MultithreadEventLoopGroup
), MultithreadEventExecutorGroup
, DefaultEventExecutorGroup
我试图了解服务器如何处理多个客户端同时发送的请求,这些请求将执行一些添加到 RTT 的业务逻辑和 CRUD 操作。下面是我的 netty 服务器代码,它可以工作,但我试图准确理解它如何与并发用户和多个开放通道一起工作。
我有一个简单的ServerBootstrap
@Component
@RequiredArgsConstructor
public class SocketServer {
private final ContextAwareLogger logger;
private final ServerInitializer serverInitializer;
private final NioEventLoopGroup bossGroup;
private final NioEventLoopGroup workerGroup;
private Channel mainChannel;
@PostConstruct
public void start() {
try {
ServerBootstrap bootstrap = init();
mainChannel = bootstrap.bind(8484).sync().channel(); // save the main channel so we can cleanly close it when app is shutdown
logger.info("Netty server started...");
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void stop() throws InterruptedException {
logger.info("Shutting down Netty server");
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
mainChannel.closeFuture().sync();
logger.info("Netty Server shutdown complete.");
}
private ServerBootstrap init() {
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 5000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(serverInitializer);
}
}
ChannelInitializer:
@Component
@RequiredArgsConstructor
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
private final PacketDecoder packetDecoder;
private final ServerHandler serverHandler;
private final PacketEncoder packetEncoder;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("decoder", packetDecoder) // ByteArrayDecoder
.addLast("encoder", packetEncoder) // ByteArrayEncoder
.addLast("inbound", serverHandler); // ChannelInboundHandlerAdapter
}
}
ChannelInboundHandlerAdapter:
@Component
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Autowired
private SomeService someService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// contains db access
byte[] accept = someService.validateClient(ctx.channel());
ctx.channel().writeAndFlush(accept);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// may contain db access
byte[] response = someService.processPacket(ctx.channel(), msg));
ctx.channel().writeAndFlush(response);
}
}
现在,当客户端连接时,我了解到将打开一个新的 Channel 并重用处理程序。要求是每个客户端 request/response 需要立即处理,而不是等待其他客户端的 CRUD 操作完成。
我的 channelRead 和 channelActive 等是否是异步的,因为我正在使用 NioEventLoopGroup
(即每个客户端的通道操作是否会 运行 彼此独立)?
如果单个客户端连续发送多个请求,是否保证它们以相同的顺序处理?
我需要为入站处理程序指定 DefaultEventExecutorGroup
吗? (
您需要为您的 ServerHandler
使用 DefaultEventExecutorGroup
或将 validateClient(...)
/ processPacket(...)
分派到您自己的线程池。如果不这样做将导致 EventLoop
线程阻塞,因此在阻塞操作完成之前无法为此 EventLoop
处理其他 IO。