带 Netty 的多线程 UDP 服务器
Multi threaded UDP server with Netty
我正在尝试使用 Netty 实现 UDP 服务器。这个想法是只绑定一次(因此只创建一个 Channel
)。此 Channel
仅使用一个处理程序进行初始化,该处理程序通过 ExecutorService
.
在多个线程之间分派传入数据报的处理。
@Configuration
public class SpringConfig {
@Autowired
private Dispatcher dispatcher;
private String host;
private int port;
@Bean
public Bootstrap bootstrap() throws Exception {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(dispatcher);
ChannelFuture future = bootstrap.bind(host, port).await();
if(!future.isSuccess())
throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());
return bootstrap;
}
}
@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {
private int workerThreads;
private ExecutorService executorService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
final Channel channel = ctx.channel();
executorService.execute(new Runnable() {
@Override
public void run() {
//Process the packet and produce a response packet (below)
DatagramPacket responsePacket = ...;
ChannelFuture future;
try {
future = channel.writeAndFlush(responsePacket).await();
} catch (InterruptedException e) {
return;
}
if(!future.isSuccess())
log.warn("Failed to write response packet.");
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(workerThreads);
}
}
我有以下问题:
Dispatcher
class的channelRead
方法接收到的DatagramPacket
在被工作线程使用之前是否应该复制?我想知道这个数据包是否在 channelRead
方法 returns 之后被销毁,即使工作线程保留了一个引用。
- 在所有工作线程之间共享
Channel
并让它们同时调用 writeAndFlush
是否安全?
谢谢!
没有。如果你需要这个对象活得更久,你要么把它变成别的东西,要么使用 ReferenceCountUtil.retain(datagram)
,然后在你完成它后使用 ReferenceCountUtil.release(datagram)
。你也不应该在执行者服务上做 await()
,你应该为发生的任何事情注册一个处理程序。
是的,通道对象是线程安全的,它们可以从许多不同的线程调用。
我正在尝试使用 Netty 实现 UDP 服务器。这个想法是只绑定一次(因此只创建一个 Channel
)。此 Channel
仅使用一个处理程序进行初始化,该处理程序通过 ExecutorService
.
@Configuration
public class SpringConfig {
@Autowired
private Dispatcher dispatcher;
private String host;
private int port;
@Bean
public Bootstrap bootstrap() throws Exception {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(dispatcher);
ChannelFuture future = bootstrap.bind(host, port).await();
if(!future.isSuccess())
throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());
return bootstrap;
}
}
@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {
private int workerThreads;
private ExecutorService executorService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
final Channel channel = ctx.channel();
executorService.execute(new Runnable() {
@Override
public void run() {
//Process the packet and produce a response packet (below)
DatagramPacket responsePacket = ...;
ChannelFuture future;
try {
future = channel.writeAndFlush(responsePacket).await();
} catch (InterruptedException e) {
return;
}
if(!future.isSuccess())
log.warn("Failed to write response packet.");
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(workerThreads);
}
}
我有以下问题:
Dispatcher
class的channelRead
方法接收到的DatagramPacket
在被工作线程使用之前是否应该复制?我想知道这个数据包是否在channelRead
方法 returns 之后被销毁,即使工作线程保留了一个引用。- 在所有工作线程之间共享
Channel
并让它们同时调用writeAndFlush
是否安全?
谢谢!
没有。如果你需要这个对象活得更久,你要么把它变成别的东西,要么使用
ReferenceCountUtil.retain(datagram)
,然后在你完成它后使用ReferenceCountUtil.release(datagram)
。你也不应该在执行者服务上做await()
,你应该为发生的任何事情注册一个处理程序。是的,通道对象是线程安全的,它们可以从许多不同的线程调用。