Netty ChannelGroupFutureListener operationComplete 从未调用过

Netty ChannelGroupFutureListener operationComplete never called

我根据 Netty 文档对服务器进行了以下定义 example:

public void start() throws Exception {

    incommingGroup = new NioEventLoopGroup();
    workerGroup = new NioEventLoopGroup();

    final ServerBootstrap serverBootstrap = new ServerBootstrap();

    serverBootstrap.group(incommingGroup, workerGroup).channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(final Channel channel) throws Exception {
                    final ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast("logger", new LoggingHandler());

                    pipeline.addLast("frameDecoder",
                            new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),
                                    Unpooled.wrappedBuffer(new byte[] { '\n' }),
                                    Unpooled.wrappedBuffer(new byte[] { ';' })));

                    pipeline.addLast("stringDecoder", new StringDecoder());
                    pipeline.addLast("stringEncoder", new StringEncoder());

                    pipeline.addLast(new TrackerChannelOutboundHandler());

                    pipeline.addLast(new TrackerChannelInboundHandler(/* eventAdmin */notificationService));

                    channelGroup.add(channel);
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    final ChannelFuture cf = serverBootstrap.bind(notificationService.port());

    final Channel channel = cf.channel();

    channelGroup = new DefaultChannelGroup("group-gpstracker", GlobalEventExecutor.INSTANCE);
    channelGroup.add(channel);
}

我想像这样向所有客户端写一些消息(同一服务器的方法class):

public void write(TrackerMessage trackerMessage) {

    ChannelGroupFuture result = channelGroup.writeAndFlush(trackerMessage, ChannelMatchers.isNonServerChannel());

    result.addListener(new ChannelGroupFutureListener() {

        @Override
        public void operationComplete(ChannelGroupFuture arg0) throws Exception {
            LOGGER.info("Some info");
        }

    });
}

TrackerChannelOutboundHandler 实现是:

public class TrackerChannelOutboundHandler extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

    if (ctx != null && ctx.channel() != null && ctx.channel().isWritable()) 
        ctx.writeAndFlush(trackerMessage.getMessage() + "\n");

}

}

客户端成功接收消息,但为什么ChannelGroupFutureListeneroperationComplete方法从未被调用?

我使用的Netty版本:4.0.40.

扩展 ChannelOutboundHandlerAdapter 时,务必确保与频道的所有本机交互都得到正确传播。这应该通过尊重 ChannelFutures 和 ChannelPromises 来完成。

目前,您正在忽略上游 ChannelPromise,阻止传递任何上游成功通知,因此您的未来永远不会被标记为成功。

您应该使用超类实现或 ctx.write(Object, ChannelPromise) 方法将您的消息正确地传递给上游,并且在出现错误的情况下,您应该手动将未来标记为失败或成功, 取决于您的意图。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    if (ctx != null && ctx.channel() != null && ctx.channel().isWritable()) 
        super.write(ctx, trackerMessage.getMessage() + "\n", promise);
    else
        promise.setFailure(new java.util.concurrent.CancellationException());
}