如何在 Netty 通道处理程序中安全地执行阻塞操作?
How can you safely perform blocking operations in a Netty channel handler?
我正在构建一个基于 Netty 的小型应用程序,它通过套接字连接(即 telnet/ssh)执行 I/O 操作。我正在用 Netty 的 ServerBootstrap
class 启动我的套接字服务器,给它:
NioEventLoopGroup
类型的事件循环(即不应受到阻塞操作的共享线程池)。
NioServerSocketChannel
类型的频道(我相信这是与上面的#1 相对应所必需的)。
一个非常简单的管道,带有一个扩展 ChannelInboundHandlerAdapter
.
的通道处理程序
只要从客户端套接字连接接收到命令字符串,就会调用我的处理程序的 channelRead(...)
方法,并且 returns 一些响应字符串取决于命令。
对于不涉及阻塞操作的命令,一切都很好。但是,现在我需要从数据库读取或写入一些命令。那些 JDBC 调用本质上会被阻塞...尽管我 可以 使用 CompletableFuture
(或其他)在单独的线程中处理它们。
但即使我 "roll-my-own async" 通过在单独的线程中执行阻塞操作,我也不确定如何将这些衍生线程的结果重新连接回主线程中的 Netty 通道处理程序。
我看到 ChannelHandlerContext
class 有如下方法:
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
... 作为我目前使用的替代品:
ChannelFuture writeAndFlush(Object msg);
但是我找不到任何文档或指南(甚至是有用的 Javadoc)来解释如何在此用例中使用此 ChannelPromise
类型。它的名字表明它 可能 相关,但也可能不相关。毕竟,writeAndFlush
方法仍然将传出消息作为其第一个参数...那么,如果您需要它的结果是,那么将阻塞操作填充到 "promise" 第二个参数中有什么好处呢?第一个参数已经准备好了吗?
这里的正确轨道是什么?有没有办法在单独的线程中处理阻塞操作,让 Netty 的 NioEventLoopGroup
不阻塞?或者这根本不是 Netty 的工作方式,如果您需要支持阻塞,您应该使用不同的事件循环实现(即为每个客户端套接字连接生成一个单独的线程)?
如果 Netty 中的操作需要较长时间才能完成或阻塞,建议在 handler that uses a separate ExecutorGroup
中执行该操作,这样主 EventLoop 线程就不会被阻塞。
您可以在管道创建期间指定。
中使用执行程序组进行数据库操作的示例
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
我正在构建一个基于 Netty 的小型应用程序,它通过套接字连接(即 telnet/ssh)执行 I/O 操作。我正在用 Netty 的 ServerBootstrap
class 启动我的套接字服务器,给它:
NioEventLoopGroup
类型的事件循环(即不应受到阻塞操作的共享线程池)。NioServerSocketChannel
类型的频道(我相信这是与上面的#1 相对应所必需的)。一个非常简单的管道,带有一个扩展
ChannelInboundHandlerAdapter
. 的通道处理程序
只要从客户端套接字连接接收到命令字符串,就会调用我的处理程序的 channelRead(...)
方法,并且 returns 一些响应字符串取决于命令。
对于不涉及阻塞操作的命令,一切都很好。但是,现在我需要从数据库读取或写入一些命令。那些 JDBC 调用本质上会被阻塞...尽管我 可以 使用 CompletableFuture
(或其他)在单独的线程中处理它们。
但即使我 "roll-my-own async" 通过在单独的线程中执行阻塞操作,我也不确定如何将这些衍生线程的结果重新连接回主线程中的 Netty 通道处理程序。
我看到 ChannelHandlerContext
class 有如下方法:
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
... 作为我目前使用的替代品:
ChannelFuture writeAndFlush(Object msg);
但是我找不到任何文档或指南(甚至是有用的 Javadoc)来解释如何在此用例中使用此 ChannelPromise
类型。它的名字表明它 可能 相关,但也可能不相关。毕竟,writeAndFlush
方法仍然将传出消息作为其第一个参数...那么,如果您需要它的结果是,那么将阻塞操作填充到 "promise" 第二个参数中有什么好处呢?第一个参数已经准备好了吗?
这里的正确轨道是什么?有没有办法在单独的线程中处理阻塞操作,让 Netty 的 NioEventLoopGroup
不阻塞?或者这根本不是 Netty 的工作方式,如果您需要支持阻塞,您应该使用不同的事件循环实现(即为每个客户端套接字连接生成一个单独的线程)?
如果 Netty 中的操作需要较长时间才能完成或阻塞,建议在 handler that uses a separate ExecutorGroup
中执行该操作,这样主 EventLoop 线程就不会被阻塞。
您可以在管道创建期间指定。
中使用执行程序组进行数据库操作的示例static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());