如何使用 netty 在单独的线程池中执行业务逻辑处理程序
How to execute business logic handler in a separate thread pool using netty
我有一个处理程序需要执行一些业务逻辑,我希望它在单独的线程池中执行,以免阻塞 io 事件循环。我已按照 http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html javadoc and http://netty.io/wiki/new-and-noteworthy-in-4.0.html#no-more-executionhandler---its-in-the-core wiki:
中的指定将 DefaultEventExecutorGroup 添加到管道中
ch.pipeline().addLast(new DefaultEventExecutorGroup(10), new ServerHandler());
出于测试目的,我的 ServerHandler 只是让当前线程休眠 5 秒:
protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
System.out.println("Starting.");
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Finished.");
}
但是显然业务逻辑还是同步执行的:
Starting.
Finished.
Starting.
Finished.
Starting.
Finished.
我错过了什么?
因为Netty
处理同一个套接字发送的请求,由同一个EventExecutor
,所以你可以启动多个客户端,并查看结果。
如果您的目标不是阻止 IO 事件循环 - 您做对了。但是由于特定于 netty,您的处理程序将始终附加到 EventExecutorGroup 的同一线程,因此您上面描述的行为是预期的。
如果你想在它到达时立即并行执行阻塞操作,你需要使用另一种方式 - 分开 ThreadPoolExecutor
。像这样:
ch.pipeline().addLast(new ServerHandler(blockingThreadPool));
其中 blockingThreadPool
是常规 ThreadPoolExecutor
。
例如:
ExecutorService blockingThreadPool = Executors.newFixedThreadPool(10);
现在,在您的逻辑处理程序中,您可以像这样向该执行程序提交阻塞任务:
protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
blockingIOProcessor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Starting.");
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Finished.");
}
});
}
您还可以将上下文传递给此 runnable,以便 return 如果需要,在处理完成时返回响应。
我有一个处理程序需要执行一些业务逻辑,我希望它在单独的线程池中执行,以免阻塞 io 事件循环。我已按照 http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html javadoc and http://netty.io/wiki/new-and-noteworthy-in-4.0.html#no-more-executionhandler---its-in-the-core wiki:
中的指定将 DefaultEventExecutorGroup 添加到管道中ch.pipeline().addLast(new DefaultEventExecutorGroup(10), new ServerHandler());
出于测试目的,我的 ServerHandler 只是让当前线程休眠 5 秒:
protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
System.out.println("Starting.");
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Finished.");
}
但是显然业务逻辑还是同步执行的:
Starting.
Finished.
Starting.
Finished.
Starting.
Finished.
我错过了什么?
因为Netty
处理同一个套接字发送的请求,由同一个EventExecutor
,所以你可以启动多个客户端,并查看结果。
如果您的目标不是阻止 IO 事件循环 - 您做对了。但是由于特定于 netty,您的处理程序将始终附加到 EventExecutorGroup 的同一线程,因此您上面描述的行为是预期的。
如果你想在它到达时立即并行执行阻塞操作,你需要使用另一种方式 - 分开 ThreadPoolExecutor
。像这样:
ch.pipeline().addLast(new ServerHandler(blockingThreadPool));
其中 blockingThreadPool
是常规 ThreadPoolExecutor
。
例如:
ExecutorService blockingThreadPool = Executors.newFixedThreadPool(10);
现在,在您的逻辑处理程序中,您可以像这样向该执行程序提交阻塞任务:
protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
blockingIOProcessor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Starting.");
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Finished.");
}
});
}
您还可以将上下文传递给此 runnable,以便 return 如果需要,在处理完成时返回响应。