Netty ChannelFuture如何通知不会造成死锁
How Netty ChannelFuture notify does not cause dead lock
我看过 Netty 指南,它对 ChannelFuture
的解释不多。我很困惑为什么它不会导致死锁。
1.It教我这样启动服务器
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup);
sb.channel(NioServerSocketChannel.class);
sb.localAddress(new InetSocketAddress(port));
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new BigGameMessageDecoder(msgRegistry));
ch.pipeline().addLast(BIG_MSG_ENCODER);
if (isDebug) {
ch.pipeline().addLast(MSG_LOGGER);
}
ch.pipeline().addLast(new GameMessageHandler<>(msgRegistry,
sessionFactory.createGameSession(), event, false));
}
});
ChannelFuture cf = sb.bind().sync();
logger.error("start server at port: {}", port);
if (sync) {
cf.channel().closeFuture().sync();
}
行内:
ChannelFuture cf = sb.bind().sync();
sb.bind()
returns ChannelFuture
和 sync()
将等到这个未来完成。
我阅读了 DefaultChannelGroupFuture
代码,它确实向我展示了 sync()
调用 await()
。
并且await()
自己锁定,等待别人的通知。
并且在 ChannelFuture
的函数 setSuccess
中它会再次尝试获取该锁。所以我的问题是,如果 sync()
先获得锁然后等待,然后 ChannelFuture
尝试通知但它无法获得锁。会不会造成死锁?
如果不这样做,ChannelFuture
如何通知其他听众?
其他书告诉我不要在 ChannelHandler
中使用 sync()
或 await()
因为它可能会导致死锁。为什么?问题1和问题3有什么区别?
public DefaultChannelGroupFuture sync() throws InterruptedException {
super.sync();
return this;
}
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {`enter code here`
decWaiters();
}
}
}
return this;
}
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
if (isDone()) {
return false;
}
synchronized (this) {
// Allow only once.
if (isDone()) {
return false;
}
if (result == null) {
this.result = SUCCESS;
} else {
this.result = result;
}
if (hasWaiters()) {
notifyAll();
}
}
return true;
}
在你上面的代码中:
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {`enter code here`
decWaiters();
}
}
}
return this;
sync
方法代码checkDeadLock();
将检查当前线程是否是用于处理io事件的内部线程,如果不是,将发生死锁,而绑定操作将调度到相同的等待 lock.And 的线程然后,wait();
将释放 this
的锁并等待某个线程获取锁并通知 it.When IO 线程调用 setSuccess
,它可以获得锁,因为没有人持有锁。
我看过 Netty 指南,它对 ChannelFuture
的解释不多。我很困惑为什么它不会导致死锁。
1.It教我这样启动服务器
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup);
sb.channel(NioServerSocketChannel.class);
sb.localAddress(new InetSocketAddress(port));
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new BigGameMessageDecoder(msgRegistry));
ch.pipeline().addLast(BIG_MSG_ENCODER);
if (isDebug) {
ch.pipeline().addLast(MSG_LOGGER);
}
ch.pipeline().addLast(new GameMessageHandler<>(msgRegistry,
sessionFactory.createGameSession(), event, false));
}
});
ChannelFuture cf = sb.bind().sync();
logger.error("start server at port: {}", port);
if (sync) {
cf.channel().closeFuture().sync();
}
行内:
ChannelFuture cf = sb.bind().sync();
sb.bind()
returns ChannelFuture
和 sync()
将等到这个未来完成。
我阅读了 DefaultChannelGroupFuture
代码,它确实向我展示了 sync()
调用 await()
。
并且await()
自己锁定,等待别人的通知。
并且在 ChannelFuture
的函数 setSuccess
中它会再次尝试获取该锁。所以我的问题是,如果 sync()
先获得锁然后等待,然后 ChannelFuture
尝试通知但它无法获得锁。会不会造成死锁?
如果不这样做,
ChannelFuture
如何通知其他听众?其他书告诉我不要在
ChannelHandler
中使用sync()
或await()
因为它可能会导致死锁。为什么?问题1和问题3有什么区别?
public DefaultChannelGroupFuture sync() throws InterruptedException {
super.sync();
return this;
}
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {`enter code here`
decWaiters();
}
}
}
return this;
}
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
if (isDone()) {
return false;
}
synchronized (this) {
// Allow only once.
if (isDone()) {
return false;
}
if (result == null) {
this.result = SUCCESS;
} else {
this.result = result;
}
if (hasWaiters()) {
notifyAll();
}
}
return true;
}
在你上面的代码中:
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {`enter code here`
decWaiters();
}
}
}
return this;
sync
方法代码checkDeadLock();
将检查当前线程是否是用于处理io事件的内部线程,如果不是,将发生死锁,而绑定操作将调度到相同的等待 lock.And 的线程然后,wait();
将释放 this
的锁并等待某个线程获取锁并通知 it.When IO 线程调用 setSuccess
,它可以获得锁,因为没有人持有锁。