如何使用 netty 在单独的线程池中执行业务逻辑处理程序

How to execute business logic handler in a separate thread pool using netty

我有一个处理程序需要执行一些业务逻辑,我希望它在单独的线程池中执行,以免阻塞 io 事件循环。我已按照 http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html javadoc and http://netty.io/wiki/new-and-noteworthy-in-4.0.html#no-more-executionhandler---its-in-the-core wiki:

中的指定将 DefaultEventExecutorGroup 添加到管道中
ch.pipeline().addLast(new DefaultEventExecutorGroup(10), new ServerHandler());

出于测试目的,我的 ServerHandler 只是让当前线程休眠 5 秒:

protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
    System.out.println("Starting.");

    try {
        Thread.currentThread().sleep(5000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    System.out.println("Finished.");        
}

但是显然业务逻辑还是同步执行的:

Starting.
Finished.
Starting.
Finished.
Starting.
Finished.

我错过了什么?

因为Netty处理同一个套接字发送的请求,由同一个EventExecutor,所以你可以启动多个客户端,并查看结果。

如果您的目标不是阻止 IO 事件循环 - 您做对了。但是由于特定于 netty,您的处理程序将始终附加到 EventExecutorGroup 的同一线程,因此您上面描述的行为是预期的。

如果你想在它到达时立即并行执行阻塞操作,你需要使用另一种方式 - 分开 ThreadPoolExecutor。像这样:

ch.pipeline().addLast(new ServerHandler(blockingThreadPool));

其中 blockingThreadPool 是常规 ThreadPoolExecutor

例如:

ExecutorService blockingThreadPool = Executors.newFixedThreadPool(10);

现在,在您的逻辑处理程序中,您可以像这样向该执行程序提交阻塞任务:

protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {

    blockingIOProcessor.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Starting.");

            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println("Finished.");
        }
    });

}

您还可以将上下文传递给此 runnable,以便 return 如果需要,在处理完成时返回响应。