Netty ChannelFuture如何通知不会造成死锁

How Netty ChannelFuture notify does not cause dead lock

我看过 Netty 指南,它对 ChannelFuture 的解释不多。我很困惑为什么它不会导致死锁。

1.It教我这样启动服务器

ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup, workerGroup);
        sb.channel(NioServerSocketChannel.class);
        sb.localAddress(new InetSocketAddress(port));
        sb.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new BigGameMessageDecoder(msgRegistry));
                        ch.pipeline().addLast(BIG_MSG_ENCODER);
                    if (isDebug) {
                        ch.pipeline().addLast(MSG_LOGGER);
                    }

                    ch.pipeline().addLast(new GameMessageHandler<>(msgRegistry,
                            sessionFactory.createGameSession(), event, false));
                } 

        });

        ChannelFuture cf = sb.bind().sync();
        logger.error("start server at port: {}", port);
        if (sync) {
            cf.channel().closeFuture().sync();
        }

行内:

ChannelFuture cf = sb.bind().sync();

sb.bind() returns ChannelFuturesync() 将等到这个未来完成。

我阅读了 DefaultChannelGroupFuture 代码,它确实向我展示了 sync() 调用 await()。 并且await()自己锁定,等待别人的通知。

并且在 ChannelFuture 的函数 setSuccess 中它会再次尝试获取该锁。所以我的问题是,如果 sync() 先获得锁然后等待,然后 ChannelFuture 尝试通知但它无法获得锁。会不会造成死锁?

  1. 如果不这样做,ChannelFuture如何通知其他听众?

  2. 其他书告诉我不要在 ChannelHandler 中使用 sync()await() 因为它可能会导致死锁。为什么?问题1和问题3有什么区别?


public DefaultChannelGroupFuture sync() throws InterruptedException {
    super.sync();
    return this;
}

  public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}


  public Promise<V> await() throws InterruptedException {
    if (isDone()) {
        return this;
    }

    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    synchronized (this) {
        while (!isDone()) {
            checkDeadLock();
            incWaiters();
            try {
                wait();
            } finally {`enter code here`
                decWaiters();
            }
        }
    }
    return this;
}

  public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}



  private boolean setSuccess0(V result) {
    if (isDone()) {
        return false;
    }

    synchronized (this) {
        // Allow only once.
        if (isDone()) {
            return false;
        }
        if (result == null) {
            this.result = SUCCESS;
        } else {
            this.result = result;
        }
        if (hasWaiters()) {
            notifyAll();
        }
    }
    return true;
}

在你上面的代码中:

public Promise<V> await() throws InterruptedException {
if (isDone()) {
    return this;
}

if (Thread.interrupted()) {
    throw new InterruptedException(toString());
}

synchronized (this) {
    while (!isDone()) {
        checkDeadLock();
        incWaiters();
        try {
            wait();
        } finally {`enter code here`
            decWaiters();
        }
    }
}
return this;

sync方法代码checkDeadLock();将检查当前线程是否是用于处理io事件的内部线程,如果不是,将发生死锁,而绑定操作将调度到相同的等待 lock.And 的线程然后,wait(); 将释放 this 的锁并等待某个线程获取锁并通知 it.When IO 线程调用 setSuccess,它可以获得锁,因为没有人持有锁。