Netty 4.0:产生、维护和与许多 clients/peers (TCP) 通信
Netty 4.0: Spawning off, maintaining and communicating with many clients/peers (TCP)
我需要与未知数量的对等点建立 TCP 连接。我 运行 遇到了一些问题,不确定我的整体方法是否正确。我当前在客户端的设置包括一个共享其 EventLoopGroup 并根据需要创建客户端的 Peer Manager:
public class PeerManagement
{
public PeerManagement()
{
// this group is shared across all clients
_eventLoopGroup = new NioEventLoopGroup();
_peers = new ConcurrentHashMap<>();
}
public void send(String s, String host)
{
// ensure that the peer exists
setPeer(host);
// look up the peer
Peer requestedPeer = _peers.get(host);
// send the request directly to the peer
requestedPeer.send(s);
}
private synchronized void setPeer(String host)
{
if (!_peers.containsKey(host))
{
// create the Peer using the EventLoopGroup & connect
Peer peer = new Peer();
peer.connect(_eventLoopGroup, host);
// add the peer to the Peer list
_peers.put(host, peer);
}
}
}
同行class:
public class Peer
{
private static final int PORT = 6010;
private Bootstrap _bootstrap;
private ChannelFuture _channelFuture;
public boolean connect(EventLoopGroup eventLoopGroup, String host)
{
_bootstrap = new Bootstrap();
_bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
public void initChannel(SocketChannel socketChannel) throws Exception
{
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder( 1024,0,4,0,4));
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
socketChannel.pipeline().addLast("customHandler", new CustomPeerHandler());
}
} );
// hold this for communicating with client
_channelFuture = _bootstrap.connect(host, PORT);
return _channelFuture.syncUninterruptibly().isSuccess();
}
public boolean send(String s)
{
if (_channelFuture.channel().isWritable())
{
// not the best method but String will be replaced by byte[]
ByteBuf buffer = _channelFuture.channel().alloc().buffer();
buffer.writeBytes(s.getBytes());
// NEVER returns true but the message is sent
return _channelFuture.channel().writeAndFlush(buffer).isSuccess();
}
return false;
}
}
如果我发送以下字符串 "this is a test",则 writeAndFlush.isSuccess() 始终为 false 但会发送消息,然后我在服务器端得到以下内容:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0e 74 68 69 73 20 69 73 20 61 20 74 65 |....this is a te|
|00000010| 73 74 |st |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded
writeAndFlush().isSuccess()
returns false
的原因是,与所有出站命令一样,writeAndFlush()
是异步的。实际写入是在通道的事件循环线程中完成的,而当您在主线程中调用 isSuccess()
时,这还没有发生。如果你想阻塞并等待写入完成,你可以使用:
channel.writeAndFlush(msg).sync().isSuccess();
您在服务器端看到的错误是因为此帧在您的 "this is a test" 消息之前到达:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
LengthFieldBasedFrameDecoder
试图解码前4个字节ff 00 00 00
作为长度,这显然太大了。你知道发送这个帧的是什么吗?会不会是你的CustomPeerHandler
?
我需要与未知数量的对等点建立 TCP 连接。我 运行 遇到了一些问题,不确定我的整体方法是否正确。我当前在客户端的设置包括一个共享其 EventLoopGroup 并根据需要创建客户端的 Peer Manager:
public class PeerManagement
{
public PeerManagement()
{
// this group is shared across all clients
_eventLoopGroup = new NioEventLoopGroup();
_peers = new ConcurrentHashMap<>();
}
public void send(String s, String host)
{
// ensure that the peer exists
setPeer(host);
// look up the peer
Peer requestedPeer = _peers.get(host);
// send the request directly to the peer
requestedPeer.send(s);
}
private synchronized void setPeer(String host)
{
if (!_peers.containsKey(host))
{
// create the Peer using the EventLoopGroup & connect
Peer peer = new Peer();
peer.connect(_eventLoopGroup, host);
// add the peer to the Peer list
_peers.put(host, peer);
}
}
}
同行class:
public class Peer
{
private static final int PORT = 6010;
private Bootstrap _bootstrap;
private ChannelFuture _channelFuture;
public boolean connect(EventLoopGroup eventLoopGroup, String host)
{
_bootstrap = new Bootstrap();
_bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
public void initChannel(SocketChannel socketChannel) throws Exception
{
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder( 1024,0,4,0,4));
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
socketChannel.pipeline().addLast("customHandler", new CustomPeerHandler());
}
} );
// hold this for communicating with client
_channelFuture = _bootstrap.connect(host, PORT);
return _channelFuture.syncUninterruptibly().isSuccess();
}
public boolean send(String s)
{
if (_channelFuture.channel().isWritable())
{
// not the best method but String will be replaced by byte[]
ByteBuf buffer = _channelFuture.channel().alloc().buffer();
buffer.writeBytes(s.getBytes());
// NEVER returns true but the message is sent
return _channelFuture.channel().writeAndFlush(buffer).isSuccess();
}
return false;
}
}
如果我发送以下字符串 "this is a test",则 writeAndFlush.isSuccess() 始终为 false 但会发送消息,然后我在服务器端得到以下内容:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0e 74 68 69 73 20 69 73 20 61 20 74 65 |....this is a te|
|00000010| 73 74 |st |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded
writeAndFlush().isSuccess()
returns false
的原因是,与所有出站命令一样,writeAndFlush()
是异步的。实际写入是在通道的事件循环线程中完成的,而当您在主线程中调用 isSuccess()
时,这还没有发生。如果你想阻塞并等待写入完成,你可以使用:
channel.writeAndFlush(msg).sync().isSuccess();
您在服务器端看到的错误是因为此帧在您的 "this is a test" 消息之前到达:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
LengthFieldBasedFrameDecoder
试图解码前4个字节ff 00 00 00
作为长度,这显然太大了。你知道发送这个帧的是什么吗?会不会是你的CustomPeerHandler
?