使用 Netty 构建只有几个客户端的服务器
Using Netty to build a server with only few clients
我熟悉 Netty
基础知识,并使用它在 TCP 上构建了一个典型的应用程序服务器 运行,旨在为许多 clients/connections 服务。但是,我最近需要构建一个服务器,该服务器旨在处理少数客户端或大多数时候只处理一个客户端。但客户端是许多设备的网关,因此会产生大量流量到我试图设计的服务器。
我的问题是:
对于这个用例是否可以/完全推荐使用 Netty
?我看过讨论here.
是否可以将多线程EventExecutor用于pipeline中的channel handlers,而不是channel EventLoop,而是由EventExecutor线程池实现并发?它会确保来自客户端的一条消息将由一个线程通过所有处理程序处理,而下一条消息由另一个线程处理吗?
是否有可用的示例实现?
根据 io.netty.channel.oio
的 documentation,如果您没有很多客户,您可以使用它。在这种情况下,每个连接都将在一个单独的线程中处理,并在后台使用 Java 旧的阻塞 IO。看看 OioByteStreamChannel::activate
:
/**
* Activate this instance. After this call {@link #isActive()} will return {@code true}.
*/
protected final void activate(InputStream is, OutputStream os) {
if (this.is != null) {
throw new IllegalStateException("input was set already");
}
if (this.os != null) {
throw new IllegalStateException("output was set already");
}
if (is == null) {
throw new NullPointerException("is");
}
if (os == null) {
throw new NullPointerException("os");
}
this.is = is;
this.os = os;
}
如您所见,oio Stream
s 将在那里使用。
根据您的评论。您可以在将处理程序添加到管道时指定 EventExecutorGroup
,如下所示:
new ChannelInitializer<Channel> {
public void initChannel(Channel ch) {
ch.pipeline().addLast(new YourHandler());
}
}
我们来看看AbstractChannelHandlerContext
:
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
在这里我们看到,如果您不注册 EventExecutor
,它将使用您在创建 ServerBootstrap
.
时指定的子事件组
new ServerBootstrap()
.group(new OioEventLoopGroup(), new OioEventLoopGroup())
//acceptor group //child group
调用通道读取的方式如下AbstractChannelHandlerContext::invokeChannelRead
:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() { //Invoked by the EventExecutor you specified
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
即使有几个连接,我也会选择 NioEventLoopGroup
。
关于您的问题:
Is it possible to use multithreaded EventExecutor to the channel
handlers in the pipeline so that instead of channel EventLoop, the
concurrency is achieved by the EventExecutor thread pool? Will it
ensure that one message from the client will be handled by one thread
through all handlers, while the next message by another thread?
Netty 的 Channel
保证对入站或出站消息的每个处理都将在同一线程中进行。您不必自己破解 EventExecutor
来处理这个问题。如果提供入站消息不需要长时间处理,您的代码将看起来像 ServerBootstrap
的基本用法。您可能会发现调整池中的线程数很有用。
我熟悉 Netty
基础知识,并使用它在 TCP 上构建了一个典型的应用程序服务器 运行,旨在为许多 clients/connections 服务。但是,我最近需要构建一个服务器,该服务器旨在处理少数客户端或大多数时候只处理一个客户端。但客户端是许多设备的网关,因此会产生大量流量到我试图设计的服务器。
我的问题是:
对于这个用例是否可以/完全推荐使用
Netty
?我看过讨论here.是否可以将多线程EventExecutor用于pipeline中的channel handlers,而不是channel EventLoop,而是由EventExecutor线程池实现并发?它会确保来自客户端的一条消息将由一个线程通过所有处理程序处理,而下一条消息由另一个线程处理吗?
是否有可用的示例实现?
根据 io.netty.channel.oio
的 documentation,如果您没有很多客户,您可以使用它。在这种情况下,每个连接都将在一个单独的线程中处理,并在后台使用 Java 旧的阻塞 IO。看看 OioByteStreamChannel::activate
:
/**
* Activate this instance. After this call {@link #isActive()} will return {@code true}.
*/
protected final void activate(InputStream is, OutputStream os) {
if (this.is != null) {
throw new IllegalStateException("input was set already");
}
if (this.os != null) {
throw new IllegalStateException("output was set already");
}
if (is == null) {
throw new NullPointerException("is");
}
if (os == null) {
throw new NullPointerException("os");
}
this.is = is;
this.os = os;
}
如您所见,oio Stream
s 将在那里使用。
根据您的评论。您可以在将处理程序添加到管道时指定 EventExecutorGroup
,如下所示:
new ChannelInitializer<Channel> {
public void initChannel(Channel ch) {
ch.pipeline().addLast(new YourHandler());
}
}
我们来看看AbstractChannelHandlerContext
:
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
在这里我们看到,如果您不注册 EventExecutor
,它将使用您在创建 ServerBootstrap
.
new ServerBootstrap()
.group(new OioEventLoopGroup(), new OioEventLoopGroup())
//acceptor group //child group
调用通道读取的方式如下AbstractChannelHandlerContext::invokeChannelRead
:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() { //Invoked by the EventExecutor you specified
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
即使有几个连接,我也会选择 NioEventLoopGroup
。
关于您的问题:
Is it possible to use multithreaded EventExecutor to the channel handlers in the pipeline so that instead of channel EventLoop, the concurrency is achieved by the EventExecutor thread pool? Will it ensure that one message from the client will be handled by one thread through all handlers, while the next message by another thread?
Netty 的 Channel
保证对入站或出站消息的每个处理都将在同一线程中进行。您不必自己破解 EventExecutor
来处理这个问题。如果提供入站消息不需要长时间处理,您的代码将看起来像 ServerBootstrap
的基本用法。您可能会发现调整池中的线程数很有用。