写入和刷新太多太快时 Netty 通道失败
Netty Channel fail when write and flush too many and too fast
当我编写生产者向我的服务器发布消息时。我看过这个:
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
我到处搜索,被告知因为频道已关闭。
但是,在我的代码中。当我的频道池破坏频道时,我只是关闭我的频道。
这是我的代码:
public static class ChannelFactory implements PoolableObjectFactory<Channel> {
private final Bootstrap bootstrap;
private String host;
private int port;
public ChannelFactory(Bootstrap bootstrap, String host, int port) {
this.bootstrap = bootstrap;
this.host = host;
this.port = port;
}
@Override
public Channel makeObject() throws Exception {
System.out.println("Create new channel!!!");
bootstrap.validate();
return bootstrap.connect(host, port).channel();
}
@Override
public void destroyObject(Channel channel) throws Exception {
ChannelFuture close = channel.close();
if (close.isSuccess()) {
System.out.println(channel + " close successfully");
}
}
@Override
public boolean validateObject(Channel channel) {
System.out.println("Validate object");
return (channel.isOpen());
}
@Override
public void activateObject(Channel channel) throws Exception {
System.out.println(channel + " is activated");
}
@Override
public void passivateObject(Channel channel) throws Exception {
System.out.println(channel + " is passivated");
}
/**
* @return the host
*/
public String getHost() {
return host;
}
/**
* @param host the host to set
* @return
*/
public ChannelFactory setHost(String host) {
this.host = host;
return this;
}
/**
* @return the port
*/
public int getPort() {
return port;
}
/**
* @param port the port to set
* @return
*/
public ChannelFactory setPort(int port) {
this.port = port;
return this;
}
}
这是我的亚军:
public static class Runner implements Runnable {
private Channel channel;
private ButtyMessage message;
private MyChannelPool channelPool;
public Runner(MyChannelPool channelPool, Channel channel, ButtyMessage message) {
this.channel = channel;
this.message = message;
this.channelPool = channelPool;
}
@Override
public void run() {
channel.writeAndFlush(message.content()).syncUninterruptibly().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelPool.returnObject(future.channel());
}
});
}
}
还有我的主要:
public static void main(String[] args) throws InterruptedException {
final String host = "127.0.0.1";
final int port = 8080;
int jobSize = 100;
int jobNumber = 10000;
final Bootstrap b = func(host, port);
final MyChannelPool channelPool = new MyChannelPool(new ChannelFactory(b, host, port));
ExecutorService threadPool = Executors.newFixedThreadPool(1);
for (int i = 0; i < jobNumber; i++) {
try {
threadPool.execute(new Runner(channelPool, channelPool.borrowObject(), new ButtyMessage()));
} catch (Exception ex) {
System.out.println("ex = " + ex.getMessage());
}
}
}
用 ButtyMessage
扩展 ByteBufHolder
。
在我的 Runner class 中,如果我在 writeAndFlush
之后睡眠 (10),它 运行 很好。但我不想回复睡眠。所以我使用ChannelFutureListener
,但结果很糟糕。如果我发送大约 1000 到 10.000 条消息,它将崩溃并在上面抛出异常。有什么办法可以避免这种情况吗?
谢谢大家。
抱歉我的解释和英语不好:)
您有几个问题可以解释这一点。大部分都和异步操作的错误用法以及以后的用法有关。
不知道是不是link你的问题,但是如果真的要在频道真正关闭的时候打印,就得等以后了,因为未来 close()
(或任何其他操作)立即 returns,无需等待真正的关闭。因此,您的测试 if (close.isSuccess())
应始终为假。
public void destroyObject(final Channel channel) throws Exception {
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture close) {
if (close.isSuccess()) {
System.out.println(channel + " close successfully");
}
}
});
}
然而,我认为它只是为了调试目的,它不是强制性的。
另一个:你将一个尚未连接的频道发回你的池(这也许可以解释你的睡眠(10)?)。您必须等待 connect()
.
public Channel makeObject() throws Exception {
System.out.println("Create new channel!!!");
//bootstrap.validate(); // this is implicitely called in connect()
ChannelFuture future = bootstrap.connect(host, port).awaitUninterruptibly();
if (future.isSuccess()) {
return future.channel();
} else {
// do what you need to do when the connection is not done
}
}
第三个:使用 isActive()
:
验证已连接的频道可能会更好
@Override
public boolean validateObject(Channel channel) {
System.out.println("Validate object");
return channel.isActive(); // instead of isOpen()
}
第四个:在你的跑步者中,你错误地等待未来,而你不应该。您可以删除您的 syncUninterruptibly()
并让其余的保持原样。
@Override
public void run() {
Channel.writeAndFlush(message.content()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelPool.returnObject(future.channel());
}
});
}
最后,我想你知道你的测试是完全顺序的(你的池中有 1 个线程),这样每个客户端都会重复使用同一个通道?
您能否尝试更改这 4 个点,看看是否可以解决您的问题?
编辑: 在请求者评论后
对于syncUntinterruptibly()
,我没有仔细看。如果你想在写时阻塞,那么你不需要额外的 addListener
因为一旦同步结束,未来就完成了。因此,您可以在同步后直接调用 channelPool.returnObject
作为下一个命令。
所以你应该这样写,更简单。
@Override
public void run() {
Channel.writeAndFlush(message.content()).syncUntinterruptibly();
channelPool.returnObject(future.channel());
}
对于 fireChannelActive
,它将在连接完成后立即调用(因此从 makeObject
开始,在将来的某个时间)。此外,一旦断开连接(正如您在异常中确实注意到的那样),该通道将不再可用,必须从零重新创建。所以我建议使用 isActive 但是,如果不活动,它将使用 destroyObject 删除...
看看通道状态模型here。
终于,我找到了适合自己的解决方案。但是,我仍在考虑另一种解决方案。 (此解决方案完全复制自 4.0.28 netty 发行说明)
final String host = "127.0.0.1";
final int port = 8080;
int jobNumber = 100000;
final EventLoopGroup group = new NioEventLoopGroup(100);
ChannelPoolMap<InetSocketAddress, MyChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, MyChannelPool>() {
@Override
protected MyChannelPool newPool(InetSocketAddress key) {
Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
return new MyChannelPool(bootstrap, new _AbstractChannelPoolHandler());
}
};
ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap1 = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {
Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
return new FixedChannelPool(bootstrap, new _AbstractChannelPoolHandler(), 10);
}
};
final ChannelPool myChannelPool = poolMap.get(new InetSocketAddress(host, port));
final CountDownLatch latch = new CountDownLatch(jobNumber);
for (int i = 0; i < jobNumber; i++) {
final int counter = i;
final Future<Channel> future = myChannelPool.acquire();
future.addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> f) {
if (f.isSuccess()) {
Channel ch = f.getNow();
// Do somethings
ch.writeAndFlush(new ButtyMessage().content()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("counter = " + counter);
System.out.println("future = " + future.channel());
latch.countDown();
}
}
});
// Release back to pool
myChannelPool.release(ch);
} else {
System.out.println(f.cause().getMessage());
f.cause().printStackTrace();
}
}
});
}
try {
latch.await();
System.exit(0);
} catch (InterruptedException ex) {
System.out.println("ex = " + ex.getMessage());
}
如你所见,我使用了SimpleChannelPool
和FixedChannelPool
(netty提供的SimpleChannelPool
的实现)。
它能做什么:
SimpleChannelPool
:根据需要打开频道 ---> 如果你有 100.000 条消息 -> 当然是因为错误。 Many socket open, then IOExeption: Too many file open occur. (那真的是池吗?尽可能多地创建并抛出异常?我不称之为池)
FixedChannelPool
:在我的情况下不起作用(还在研究为什么?=))对不起我的愚蠢)
事实上,我想改用 ObjectPool。我可能会在完成后立即 post 它。感谢@Frederic Brégier 对我的帮助!
当我编写生产者向我的服务器发布消息时。我看过这个:
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
我到处搜索,被告知因为频道已关闭。
但是,在我的代码中。当我的频道池破坏频道时,我只是关闭我的频道。
这是我的代码:
public static class ChannelFactory implements PoolableObjectFactory<Channel> {
private final Bootstrap bootstrap;
private String host;
private int port;
public ChannelFactory(Bootstrap bootstrap, String host, int port) {
this.bootstrap = bootstrap;
this.host = host;
this.port = port;
}
@Override
public Channel makeObject() throws Exception {
System.out.println("Create new channel!!!");
bootstrap.validate();
return bootstrap.connect(host, port).channel();
}
@Override
public void destroyObject(Channel channel) throws Exception {
ChannelFuture close = channel.close();
if (close.isSuccess()) {
System.out.println(channel + " close successfully");
}
}
@Override
public boolean validateObject(Channel channel) {
System.out.println("Validate object");
return (channel.isOpen());
}
@Override
public void activateObject(Channel channel) throws Exception {
System.out.println(channel + " is activated");
}
@Override
public void passivateObject(Channel channel) throws Exception {
System.out.println(channel + " is passivated");
}
/**
* @return the host
*/
public String getHost() {
return host;
}
/**
* @param host the host to set
* @return
*/
public ChannelFactory setHost(String host) {
this.host = host;
return this;
}
/**
* @return the port
*/
public int getPort() {
return port;
}
/**
* @param port the port to set
* @return
*/
public ChannelFactory setPort(int port) {
this.port = port;
return this;
}
}
这是我的亚军:
public static class Runner implements Runnable {
private Channel channel;
private ButtyMessage message;
private MyChannelPool channelPool;
public Runner(MyChannelPool channelPool, Channel channel, ButtyMessage message) {
this.channel = channel;
this.message = message;
this.channelPool = channelPool;
}
@Override
public void run() {
channel.writeAndFlush(message.content()).syncUninterruptibly().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelPool.returnObject(future.channel());
}
});
}
}
还有我的主要:
public static void main(String[] args) throws InterruptedException {
final String host = "127.0.0.1";
final int port = 8080;
int jobSize = 100;
int jobNumber = 10000;
final Bootstrap b = func(host, port);
final MyChannelPool channelPool = new MyChannelPool(new ChannelFactory(b, host, port));
ExecutorService threadPool = Executors.newFixedThreadPool(1);
for (int i = 0; i < jobNumber; i++) {
try {
threadPool.execute(new Runner(channelPool, channelPool.borrowObject(), new ButtyMessage()));
} catch (Exception ex) {
System.out.println("ex = " + ex.getMessage());
}
}
}
用 ButtyMessage
扩展 ByteBufHolder
。
在我的 Runner class 中,如果我在 writeAndFlush
之后睡眠 (10),它 运行 很好。但我不想回复睡眠。所以我使用ChannelFutureListener
,但结果很糟糕。如果我发送大约 1000 到 10.000 条消息,它将崩溃并在上面抛出异常。有什么办法可以避免这种情况吗?
谢谢大家。
抱歉我的解释和英语不好:)
您有几个问题可以解释这一点。大部分都和异步操作的错误用法以及以后的用法有关。
不知道是不是link你的问题,但是如果真的要在频道真正关闭的时候打印,就得等以后了,因为未来
close()
(或任何其他操作)立即 returns,无需等待真正的关闭。因此,您的测试if (close.isSuccess())
应始终为假。public void destroyObject(final Channel channel) throws Exception { channel.close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture close) { if (close.isSuccess()) { System.out.println(channel + " close successfully"); } } }); }
然而,我认为它只是为了调试目的,它不是强制性的。
另一个:你将一个尚未连接的频道发回你的池(这也许可以解释你的睡眠(10)?)。您必须等待
connect()
.public Channel makeObject() throws Exception { System.out.println("Create new channel!!!"); //bootstrap.validate(); // this is implicitely called in connect() ChannelFuture future = bootstrap.connect(host, port).awaitUninterruptibly(); if (future.isSuccess()) { return future.channel(); } else { // do what you need to do when the connection is not done } }
第三个:使用
验证已连接的频道可能会更好isActive()
:@Override public boolean validateObject(Channel channel) { System.out.println("Validate object"); return channel.isActive(); // instead of isOpen() }
第四个:在你的跑步者中,你错误地等待未来,而你不应该。您可以删除您的
syncUninterruptibly()
并让其余的保持原样。@Override public void run() { Channel.writeAndFlush(message.content()).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { channelPool.returnObject(future.channel()); } }); }
最后,我想你知道你的测试是完全顺序的(你的池中有 1 个线程),这样每个客户端都会重复使用同一个通道?
您能否尝试更改这 4 个点,看看是否可以解决您的问题?
编辑: 在请求者评论后
对于syncUntinterruptibly()
,我没有仔细看。如果你想在写时阻塞,那么你不需要额外的 addListener
因为一旦同步结束,未来就完成了。因此,您可以在同步后直接调用 channelPool.returnObject
作为下一个命令。
所以你应该这样写,更简单。
@Override
public void run() {
Channel.writeAndFlush(message.content()).syncUntinterruptibly();
channelPool.returnObject(future.channel());
}
对于 fireChannelActive
,它将在连接完成后立即调用(因此从 makeObject
开始,在将来的某个时间)。此外,一旦断开连接(正如您在异常中确实注意到的那样),该通道将不再可用,必须从零重新创建。所以我建议使用 isActive 但是,如果不活动,它将使用 destroyObject 删除...
看看通道状态模型here。
终于,我找到了适合自己的解决方案。但是,我仍在考虑另一种解决方案。 (此解决方案完全复制自 4.0.28 netty 发行说明)
final String host = "127.0.0.1";
final int port = 8080;
int jobNumber = 100000;
final EventLoopGroup group = new NioEventLoopGroup(100);
ChannelPoolMap<InetSocketAddress, MyChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, MyChannelPool>() {
@Override
protected MyChannelPool newPool(InetSocketAddress key) {
Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
return new MyChannelPool(bootstrap, new _AbstractChannelPoolHandler());
}
};
ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap1 = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {
Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
return new FixedChannelPool(bootstrap, new _AbstractChannelPoolHandler(), 10);
}
};
final ChannelPool myChannelPool = poolMap.get(new InetSocketAddress(host, port));
final CountDownLatch latch = new CountDownLatch(jobNumber);
for (int i = 0; i < jobNumber; i++) {
final int counter = i;
final Future<Channel> future = myChannelPool.acquire();
future.addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> f) {
if (f.isSuccess()) {
Channel ch = f.getNow();
// Do somethings
ch.writeAndFlush(new ButtyMessage().content()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("counter = " + counter);
System.out.println("future = " + future.channel());
latch.countDown();
}
}
});
// Release back to pool
myChannelPool.release(ch);
} else {
System.out.println(f.cause().getMessage());
f.cause().printStackTrace();
}
}
});
}
try {
latch.await();
System.exit(0);
} catch (InterruptedException ex) {
System.out.println("ex = " + ex.getMessage());
}
如你所见,我使用了SimpleChannelPool
和FixedChannelPool
(netty提供的SimpleChannelPool
的实现)。
它能做什么:
SimpleChannelPool
:根据需要打开频道 ---> 如果你有 100.000 条消息 -> 当然是因为错误。 Many socket open, then IOExeption: Too many file open occur. (那真的是池吗?尽可能多地创建并抛出异常?我不称之为池)
FixedChannelPool
:在我的情况下不起作用(还在研究为什么?=))对不起我的愚蠢)
事实上,我想改用 ObjectPool。我可能会在完成后立即 post 它。感谢@Frederic Brégier 对我的帮助!