Netty ChannelGroupFutureListener operationComplete 从未调用过
Netty ChannelGroupFutureListener operationComplete never called
我根据 Netty 文档对服务器进行了以下定义 example:
public void start() throws Exception {
incommingGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(incommingGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel channel) throws Exception {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("logger", new LoggingHandler());
pipeline.addLast("frameDecoder",
new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),
Unpooled.wrappedBuffer(new byte[] { '\n' }),
Unpooled.wrappedBuffer(new byte[] { ';' })));
pipeline.addLast("stringDecoder", new StringDecoder());
pipeline.addLast("stringEncoder", new StringEncoder());
pipeline.addLast(new TrackerChannelOutboundHandler());
pipeline.addLast(new TrackerChannelInboundHandler(/* eventAdmin */notificationService));
channelGroup.add(channel);
}
}).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
final ChannelFuture cf = serverBootstrap.bind(notificationService.port());
final Channel channel = cf.channel();
channelGroup = new DefaultChannelGroup("group-gpstracker", GlobalEventExecutor.INSTANCE);
channelGroup.add(channel);
}
我想像这样向所有客户端写一些消息(同一服务器的方法class):
public void write(TrackerMessage trackerMessage) {
ChannelGroupFuture result = channelGroup.writeAndFlush(trackerMessage, ChannelMatchers.isNonServerChannel());
result.addListener(new ChannelGroupFutureListener() {
@Override
public void operationComplete(ChannelGroupFuture arg0) throws Exception {
LOGGER.info("Some info");
}
});
}
TrackerChannelOutboundHandler
实现是:
public class TrackerChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (ctx != null && ctx.channel() != null && ctx.channel().isWritable())
ctx.writeAndFlush(trackerMessage.getMessage() + "\n");
}
}
客户端成功接收消息,但为什么ChannelGroupFutureListener
的operationComplete
方法从未被调用?
我使用的Netty版本:4.0.40.
扩展 ChannelOutboundHandlerAdapter
时,务必确保与频道的所有本机交互都得到正确传播。这应该通过尊重 ChannelFuture
s 和 ChannelPromise
s 来完成。
目前,您正在忽略上游 ChannelPromise
,阻止传递任何上游成功通知,因此您的未来永远不会被标记为成功。
您应该使用超类实现或 ctx.write(Object, ChannelPromise)
方法将您的消息正确地传递给上游,并且在出现错误的情况下,您应该手动将未来标记为失败或成功, 取决于您的意图。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (ctx != null && ctx.channel() != null && ctx.channel().isWritable())
super.write(ctx, trackerMessage.getMessage() + "\n", promise);
else
promise.setFailure(new java.util.concurrent.CancellationException());
}
我根据 Netty 文档对服务器进行了以下定义 example:
public void start() throws Exception {
incommingGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(incommingGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel channel) throws Exception {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("logger", new LoggingHandler());
pipeline.addLast("frameDecoder",
new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),
Unpooled.wrappedBuffer(new byte[] { '\n' }),
Unpooled.wrappedBuffer(new byte[] { ';' })));
pipeline.addLast("stringDecoder", new StringDecoder());
pipeline.addLast("stringEncoder", new StringEncoder());
pipeline.addLast(new TrackerChannelOutboundHandler());
pipeline.addLast(new TrackerChannelInboundHandler(/* eventAdmin */notificationService));
channelGroup.add(channel);
}
}).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
final ChannelFuture cf = serverBootstrap.bind(notificationService.port());
final Channel channel = cf.channel();
channelGroup = new DefaultChannelGroup("group-gpstracker", GlobalEventExecutor.INSTANCE);
channelGroup.add(channel);
}
我想像这样向所有客户端写一些消息(同一服务器的方法class):
public void write(TrackerMessage trackerMessage) {
ChannelGroupFuture result = channelGroup.writeAndFlush(trackerMessage, ChannelMatchers.isNonServerChannel());
result.addListener(new ChannelGroupFutureListener() {
@Override
public void operationComplete(ChannelGroupFuture arg0) throws Exception {
LOGGER.info("Some info");
}
});
}
TrackerChannelOutboundHandler
实现是:
public class TrackerChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (ctx != null && ctx.channel() != null && ctx.channel().isWritable())
ctx.writeAndFlush(trackerMessage.getMessage() + "\n");
}
}
客户端成功接收消息,但为什么ChannelGroupFutureListener
的operationComplete
方法从未被调用?
我使用的Netty版本:4.0.40.
扩展 ChannelOutboundHandlerAdapter
时,务必确保与频道的所有本机交互都得到正确传播。这应该通过尊重 ChannelFuture
s 和 ChannelPromise
s 来完成。
目前,您正在忽略上游 ChannelPromise
,阻止传递任何上游成功通知,因此您的未来永远不会被标记为成功。
您应该使用超类实现或 ctx.write(Object, ChannelPromise)
方法将您的消息正确地传递给上游,并且在出现错误的情况下,您应该手动将未来标记为失败或成功, 取决于您的意图。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (ctx != null && ctx.channel() != null && ctx.channel().isWritable())
super.write(ctx, trackerMessage.getMessage() + "\n", promise);
else
promise.setFailure(new java.util.concurrent.CancellationException());
}