写入和刷新太多太快时 Netty 通道失败

Netty Channel fail when write and flush too many and too fast

当我编写生产者向我的服务器发布消息时。我看过这个:

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

我到处搜索,被告知因为频道已关闭。 但是,在我的代码中。当我的频道池破坏频道时,我只是关闭我的频道。
这是我的代码:

public static class ChannelFactory implements PoolableObjectFactory<Channel> {

    private final Bootstrap bootstrap;
    private String host;
    private int port;

    public ChannelFactory(Bootstrap bootstrap, String host, int port) {
        this.bootstrap = bootstrap;
        this.host = host;
        this.port = port;
    }

    @Override
    public Channel makeObject() throws Exception {
        System.out.println("Create new channel!!!");
        bootstrap.validate();
        return bootstrap.connect(host, port).channel();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        ChannelFuture close = channel.close();
        if (close.isSuccess()) {
            System.out.println(channel + " close successfully");
        }
    }

    @Override
    public boolean validateObject(Channel channel) {
        System.out.println("Validate object");
        return (channel.isOpen());
    }

    @Override
    public void activateObject(Channel channel) throws Exception {
        System.out.println(channel + " is activated");
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        System.out.println(channel + " is passivated");
    }

    /**
     * @return the host
     */
    public String getHost() {
        return host;
    }

    /**
     * @param host the host to set
     * @return
     */
    public ChannelFactory setHost(String host) {
        this.host = host;
        return this;
    }

    /**
     * @return the port
     */
    public int getPort() {
        return port;
    }

    /**
     * @param port the port to set
     * @return
     */
    public ChannelFactory setPort(int port) {
        this.port = port;
        return this;
    }

}

这是我的亚军:

public static class Runner implements Runnable {

    private Channel channel;
    private ButtyMessage message;
    private MyChannelPool channelPool;

    public Runner(MyChannelPool channelPool, Channel channel, ButtyMessage message) {
        this.channel = channel;
        this.message = message;
        this.channelPool = channelPool;
    }

    @Override
    public void run() {
        channel.writeAndFlush(message.content()).syncUninterruptibly().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                channelPool.returnObject(future.channel());
            }
        });
    }

}

还有我的主要:

public static void main(String[] args) throws InterruptedException {

    final String host = "127.0.0.1";
    final int port = 8080;
    int jobSize = 100;
    int jobNumber = 10000;
    final Bootstrap b = func(host, port);

    final MyChannelPool channelPool = new MyChannelPool(new ChannelFactory(b, host, port));

    ExecutorService threadPool = Executors.newFixedThreadPool(1);
    for (int i = 0; i < jobNumber; i++) {
        try {
            threadPool.execute(new Runner(channelPool, channelPool.borrowObject(), new ButtyMessage()));
        } catch (Exception ex) {
            System.out.println("ex = " + ex.getMessage());
        }
    }
}

ButtyMessage 扩展 ByteBufHolder
在我的 Runner class 中,如果我在 writeAndFlush 之后睡眠 (10),它 运行 很好。但我不想回复睡眠。所以我使用ChannelFutureListener,但结果很糟糕。如果我发送大约 1000 到 10.000 条消息,它将崩溃并在上面抛出异常。有什么办法可以避免这种情况吗?

谢谢大家。
抱歉我的解释和英语不好:)

您有几个问题可以解释这一点。大部分都和异步操作的错误用法以及以后的用法有关。

  • 不知道是不是link你的问题,但是如果真的要在频道真正关闭的时候打印,就得等以后了,因为未来 close()(或任何其他操作)立即 returns,无需等待真正的关闭。因此,您的测试 if (close.isSuccess()) 应始终为假。

    public void destroyObject(final Channel channel) throws Exception {
       channel.close().addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture close) {
            if (close.isSuccess()) {
               System.out.println(channel + " close successfully");
            }
          }
       });
    }
    

然而,我认为它只是为了调试目的,它不是强制性的。

  • 另一个:你将一个尚未连接的频道发回你的池(这也许可以解释你的睡眠(10)?)。您必须等待 connect().

    public Channel makeObject() throws Exception {
       System.out.println("Create new channel!!!");
       //bootstrap.validate(); // this is implicitely called in connect()
       ChannelFuture future = bootstrap.connect(host, port).awaitUninterruptibly();
       if (future.isSuccess()) {
         return future.channel();
       } else {
         // do what you need to do when the connection is not done
       }
    }
    
  • 第三个:使用 isActive():

    验证已连接的频道可能会更好
    @Override
    public boolean validateObject(Channel channel) {
        System.out.println("Validate object");
        return channel.isActive(); // instead of isOpen()
    }
    
  • 第四个:在你的跑步者中,你错误地等待未来,而你不应该。您可以删除您的 syncUninterruptibly() 并让其余的保持原样。

    @Override
    public void run() {
       Channel.writeAndFlush(message.content()).addListener(new ChannelFutureListener() {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
            channelPool.returnObject(future.channel());
         }
       });
    }
    
  • 最后,我想你知道你的测试是完全顺序的(你的池中有 1 个线程),这样每个客户端都会重复使用同一个通道?

您能否尝试更改这 4 个点,看看是否可以解决您的问题?


编辑: 在请求者评论后

对于syncUntinterruptibly(),我没有仔细看。如果你想在写时阻塞,那么你不需要额外的 addListener 因为一旦同步结束,未来就完成了。因此,您可以在同步后直接调用 channelPool.returnObject 作为下一个命令。

所以你应该这样写,更简单。

    @Override
    public void run() {
       Channel.writeAndFlush(message.content()).syncUntinterruptibly();
       channelPool.returnObject(future.channel());
    }

对于 fireChannelActive,它将在连接完成后立即调用(因此从 makeObject 开始,在将来的某个时间)。此外,一旦断开连接(正如您在异常中确实注意到的那样),该通道将不再可用,必须从零重新创建。所以我建议使用 isActive 但是,如果不活动,它将使用 destroyObject 删除...

看看通道状态模型here

终于,我找到了适合自己的解决方案。但是,我仍在考虑另一种解决方案。 (此解决方案完全复制自 4.0.28 netty 发行说明)

final String host = "127.0.0.1";
    final int port = 8080;
    int jobNumber = 100000;
    final EventLoopGroup group = new NioEventLoopGroup(100);

    ChannelPoolMap<InetSocketAddress, MyChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, MyChannelPool>() {

        @Override
        protected MyChannelPool newPool(InetSocketAddress key) {
            Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
            return new MyChannelPool(bootstrap, new _AbstractChannelPoolHandler());
        }
    };

    ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap1 = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {

        @Override
        protected FixedChannelPool newPool(InetSocketAddress key) {
            Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
            return new FixedChannelPool(bootstrap, new _AbstractChannelPoolHandler(), 10);
        }
    };

    final ChannelPool myChannelPool = poolMap.get(new InetSocketAddress(host, port));
    final CountDownLatch latch = new CountDownLatch(jobNumber);

    for (int i = 0; i < jobNumber; i++) {
        final int counter = i;
        final Future<Channel> future = myChannelPool.acquire();
        future.addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> f) {
                if (f.isSuccess()) {
                    Channel ch = f.getNow();
                    // Do somethings
                    ch.writeAndFlush(new ButtyMessage().content()).addListener(new ChannelFutureListener() {

                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                System.out.println("counter = " + counter);
                                System.out.println("future = " + future.channel());
                                latch.countDown();
                            }
                        }
                    });
                    // Release back to pool
                    myChannelPool.release(ch);

                } else {
                    System.out.println(f.cause().getMessage());
                    f.cause().printStackTrace();
                }
            }
        });
    }
    try {
        latch.await();
        System.exit(0);
    } catch (InterruptedException ex) {
        System.out.println("ex = " + ex.getMessage());
    }

如你所见,我使用了SimpleChannelPoolFixedChannelPool(netty提供的SimpleChannelPool的实现)。
它能做什么:
SimpleChannelPool:根据需要打开频道 ---> 如果你有 100.000 条消息 -> 当然是因为错误。 Many socket open, then IOExeption: Too many file open occur. (那真的是池吗?尽可能多地创建并抛出异常?我不称之为池)
FixedChannelPool:在我的情况下不起作用(还在研究为什么?=))对不起我的愚蠢) 事实上,我想改用 ObjectPool。我可能会在完成后立即 post 它。感谢@Frederic Brégier 对我的帮助!