使用 Netty 同时处理来自单个客户端的多个请求
Concurrently handling multiple requests from a single client with Netty
我正在调整基本的 Netty Discard 服务器示例,以了解 Netty 如何处理来自同一连接客户端的并发请求。我有以下处理程序....
private static class Handler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext arg0, String arg1) throws Exception {
System.out.println("received " + arg1);
Thread.sleep(10000);
System.out.println("woke up");
}
}
然后服务器站起来如下....
public static void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(3);
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new Handler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(4744).sync(); // (7)
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
服务器 运行 这样...
public static void main(String[] args) {
System.out.println("heere");
Executors.newSingleThreadExecutor().execute(() -> {
try {
run();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
} here
我的目标是查看具有多个线程的处理程序 运行,由交织的 'receive' 和 'woke up' 流量指示。我的印象是工作组是为请求提供服务的线程,我有 3 个分配给它,但我没有看到我期望的并发性。我做错了什么。
我只是快速运行你的代码,看看没有错。它也可能与 运行 的顺序和时间有关。因此,我建议您按照以下方式更改我们的处理程序。基本上,添加线程名称并将睡眠时间减少到 1 秒。请在下面找到示例输出,我可以看到所有 3 个线程同时处理不同的请求。
protected void channelRead0(ChannelHandlerContext arg0, String arg1) throws Exception {
System.out.println("received " + Thread.currentThread().getName() );
Thread.sleep(1000);
System.out.println("woke up " + Thread.currentThread().getName());
}
输出
...
received nioEventLoopGroup-3-1
received nioEventLoopGroup-3-2
woke up nioEventLoopGroup-3-1
received nioEventLoopGroup-3-1
woke up nioEventLoopGroup-3-2
received nioEventLoopGroup-3-2
woke up nioEventLoopGroup-3-1
received nioEventLoopGroup-3-1
woke up nioEventLoopGroup-3-2
received nioEventLoopGroup-3-2
received nioEventLoopGroup-3-3
...
您的 Handler
正在阻塞调用线程,这可能可以模拟繁重的工作负载(无论 CPU 使用情况如何)但 Netty 不应该 运行 持久EventLoop
中的任务,因为它们阻止了应处理入站消息的线程的重用。您的 run()
方法正在阻止调用者(使用 f.channel().closeFuture().sync()
)。调用者是 SingleThreadExecutor
中的 main()
方法。因此,您一次 运行 连接一台服务器,它一次接受一个连接。
@Apolozeus 的回答是正确的,因为减少睡眠延迟会让您更接近 Netty 旨在支持的行为。
您想试验来自同一客户端的并发连接。除非您编写特定的代码,否则连接的是同一个客户端(从服务器的角度来看是相同的远程 IP 地址)还是不同的客户端都没有关系。
您的代码非常奇怪,因为您多次 运行 服务器。这表明 f.channel().closeFuture().sync()
是一个阻塞调用。通过单个服务器将 "sleep task" 卸载到另一个执行程序,您可以获得更接近真实应用程序的行为。 "sleep task" 还应该向客户端发送一些响应并关闭 Channel
.
我正在调整基本的 Netty Discard 服务器示例,以了解 Netty 如何处理来自同一连接客户端的并发请求。我有以下处理程序....
private static class Handler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext arg0, String arg1) throws Exception {
System.out.println("received " + arg1);
Thread.sleep(10000);
System.out.println("woke up");
}
}
然后服务器站起来如下....
public static void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(3);
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new Handler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(4744).sync(); // (7)
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
服务器 运行 这样...
public static void main(String[] args) {
System.out.println("heere");
Executors.newSingleThreadExecutor().execute(() -> {
try {
run();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
} here
我的目标是查看具有多个线程的处理程序 运行,由交织的 'receive' 和 'woke up' 流量指示。我的印象是工作组是为请求提供服务的线程,我有 3 个分配给它,但我没有看到我期望的并发性。我做错了什么。
我只是快速运行你的代码,看看没有错。它也可能与 运行 的顺序和时间有关。因此,我建议您按照以下方式更改我们的处理程序。基本上,添加线程名称并将睡眠时间减少到 1 秒。请在下面找到示例输出,我可以看到所有 3 个线程同时处理不同的请求。
protected void channelRead0(ChannelHandlerContext arg0, String arg1) throws Exception {
System.out.println("received " + Thread.currentThread().getName() );
Thread.sleep(1000);
System.out.println("woke up " + Thread.currentThread().getName());
}
输出
...
received nioEventLoopGroup-3-1
received nioEventLoopGroup-3-2
woke up nioEventLoopGroup-3-1
received nioEventLoopGroup-3-1
woke up nioEventLoopGroup-3-2
received nioEventLoopGroup-3-2
woke up nioEventLoopGroup-3-1
received nioEventLoopGroup-3-1
woke up nioEventLoopGroup-3-2
received nioEventLoopGroup-3-2
received nioEventLoopGroup-3-3
...
您的 Handler
正在阻塞调用线程,这可能可以模拟繁重的工作负载(无论 CPU 使用情况如何)但 Netty 不应该 运行 持久EventLoop
中的任务,因为它们阻止了应处理入站消息的线程的重用。您的 run()
方法正在阻止调用者(使用 f.channel().closeFuture().sync()
)。调用者是 SingleThreadExecutor
中的 main()
方法。因此,您一次 运行 连接一台服务器,它一次接受一个连接。
@Apolozeus 的回答是正确的,因为减少睡眠延迟会让您更接近 Netty 旨在支持的行为。
您想试验来自同一客户端的并发连接。除非您编写特定的代码,否则连接的是同一个客户端(从服务器的角度来看是相同的远程 IP 地址)还是不同的客户端都没有关系。
您的代码非常奇怪,因为您多次 运行 服务器。这表明 f.channel().closeFuture().sync()
是一个阻塞调用。通过单个服务器将 "sleep task" 卸载到另一个执行程序,您可以获得更接近真实应用程序的行为。 "sleep task" 还应该向客户端发送一些响应并关闭 Channel
.