Netty 4.0:产生、维护和与许多 clients/peers (TCP) 通信

Netty 4.0: Spawning off, maintaining and communicating with many clients/peers (TCP)

我需要与未知数量的对等点建立 TCP 连接。我 运行 遇到了一些问题,不确定我的整体方法是否正确。我当前在客户端的设置包括一个共享其 EventLoopGroup 并根据需要创建客户端的 Peer Manager:

public class PeerManagement
{
  public PeerManagement()
  {
   // this group is shared across all clients
   _eventLoopGroup = new NioEventLoopGroup();
   _peers = new ConcurrentHashMap<>();
  }

  public void send(String s, String host)
  {
   // ensure that the peer exists
   setPeer(host);

   // look up the peer
   Peer requestedPeer = _peers.get(host);

   // send the request directly to the peer
   requestedPeer.send(s);
  }

  private synchronized void setPeer(String host)
  {
    if (!_peers.containsKey(host))
    {
     // create the Peer using the EventLoopGroup & connect
     Peer peer = new Peer();
     peer.connect(_eventLoopGroup, host);
     // add the peer to the Peer list
     _peers.put(host, peer);
    }
  }
}

同行class:

public class Peer
{
  private static final int PORT = 6010;

  private Bootstrap _bootstrap;
  private ChannelFuture _channelFuture;

  public boolean connect(EventLoopGroup eventLoopGroup, String host)
  {
    _bootstrap = new Bootstrap();
    _bootstrap.group(eventLoopGroup)
   .channel(NioSocketChannel.class)
   .option(ChannelOption.SO_KEEPALIVE, true)
   .handler(new ChannelInitializer<SocketChannel>()
   {
    @Override
    public void initChannel(SocketChannel socketChannel) throws Exception
    {
        socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder( 1024,0,4,0,4));
        socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
        socketChannel.pipeline().addLast("customHandler", new CustomPeerHandler());
      }
    } );

   // hold this for communicating with client
   _channelFuture = _bootstrap.connect(host, PORT);
   return _channelFuture.syncUninterruptibly().isSuccess();
  }

  public boolean send(String s)
  {
   if (_channelFuture.channel().isWritable())
   {
    // not the best method but String will be replaced by byte[]
    ByteBuf buffer = _channelFuture.channel().alloc().buffer();
    buffer.writeBytes(s.getBytes());

    // NEVER returns true but the message is sent
    return _channelFuture.channel().writeAndFlush(buffer).isSuccess();
   }
   return false;
  }

}

如果我发送以下字符串 "this is a test",则 writeAndFlush.isSuccess() 始终为 false 但会发送消息,然后我在服务器端得到以下内容:

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f                   |..........      |
+--------+-------------------------------------------------+----------------+

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0e 74 68 69 73 20 69 73 20 61 20 74 65 |....this is a te|
|00000010| 73 74                                           |st              |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f                   |..........      |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded

writeAndFlush().isSuccess() returns false 的原因是,与所有出站命令一样,writeAndFlush() 是异步的。实际写入是在通道的事件循环线程中完成的,而当您在主线程中调用 isSuccess() 时,这还没有发生。如果你想阻塞并等待写入完成,你可以使用:

channel.writeAndFlush(msg).sync().isSuccess();

您在服务器端看到的错误是因为此帧在您的 "this is a test" 消息之前到达:

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f                   |..........      |
+--------+-------------------------------------------------+----------------+

LengthFieldBasedFrameDecoder试图解码前4个字节ff 00 00 00作为长度,这显然太大了。你知道发送这个帧的是什么吗?会不会是你的CustomPeerHandler