如何在 NIO Server 上保存每个通道的数据
How to keep data with each channel on NIO Server
我有一个 Java NIO 服务器,它从客户端接收数据。
当通道准备好读取时,即 key.isReadable()
return true read(key)
被调用以读取数据。
目前我对所有通道使用单个读取缓冲区,在 read()
方法中,我清除缓冲区并读入它,然后最后放入一个字节数组,假设我将获取所有数据一枪。
但是假设我一次没有得到完整的数据(我在数据结尾处有特殊字符需要检测)。
问题:
那么现在如何将这部分数据保留在通道中或如何处理部分读取问题?或全球?
我看附件哪里不好
看看 Reactor 模式。这是 Doug Lea 教授对基本实现的 link:
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
想法是让单个反应器线程在 Selector 调用时阻塞。一旦准备好 IO 事件,反应器线程就会将事件分派给适当的处理程序。
在上面的 pdf 中,Reactor 中有一个内部 class Acceptor,它接受新的连接。
作者对读取和写入事件使用单个处理程序并维护此处理程序的状态。我更喜欢为读取和写入设置单独的处理程序,但这不像 'state machine' 那样容易使用。每个事件只能有一个附件,因此需要某种注入来切换 read/write 处理程序。
要在后续 read/write 之间保持状态,您必须做几件事:
- 引入自定义协议,在消息被完全阅读时告诉您
- 有超时或清除失效连接的机制
- 维护特定于客户的会话
所以,你可以这样做:
public class Reactor implements Runnable{
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
public Reactor(int port) throws IOException {
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// let Reactor handle new connection events
registerAcceptor();
}
/**
* Registers Acceptor as handler for new client connections.
*
* @throws ClosedChannelException
*/
private void registerAcceptor() throws ClosedChannelException {
SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey0.attach(new Acceptor());
}
@Override
public void run(){
while(!Thread.interrupted()){
startReactorLoop();
}
}
private void startReactorLoop() {
try {
// wait for new events for each registered or new clients
selector.select();
// get selection keys for pending events
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();
while (selectedKeysIterator.hasNext()) {
// dispatch even to handler for the given key
dispatch(selectedKeysIterator.next());
// remove dispatched key from the collection
selectedKeysIterator.remove();
}
} catch (IOException e) {
// TODO add handling of this exception
e.printStackTrace();
}
}
private void dispatch(SelectionKey interestedEvent) {
if (interestedEvent.attachment() != null) {
EventHandler handler = (EventHandler) interestedEvent.attachment();
handler.processEvent();
}
}
private class Acceptor implements EventHandler {
@Override
public void processEvent() {
try {
SocketChannel clientConnection = serverSocketChannel.accept();
if (clientConnection != null) {
registerChannel(clientConnection);
}
} catch (IOException e) {e.printStackTrace();}
}
/**
* Save Channel - key association - in Map perhaps.
* This is required for subsequent/partial reads/writes
*/
private void registerChannel(SocketChannel clientChannel) {
// notify injection mechanism of new connection (so it can activate Read Handler)
}
处理读取事件后,通知注入机制可以注入写入处理程序。
当新连接可用时,注入机制会创建一次读写处理程序的新实例。这种注入机制会根据需要切换处理程序。每个 Channel 的处理程序的查找是通过方法 `registerChannel().
在连接 Acceptance 中填充的 Map 完成的。
读取和写入处理程序有 ByteBuffer
个实例,并且由于每个套接字通道都有自己的一对处理程序,您现在可以在部分读取和写入之间维护状态。
提高性能的两个技巧:
尝试在接受连接后立即进行首次读取。仅当您没有读取自定义协议中 header 定义的足够数据时,才为读取事件注册频道兴趣。
尝试先写而不注册写事件的兴趣,只有当你不写所有数据时,才注册兴趣
写.
这将减少选择器唤醒的次数。
像这样:
SocketChannel socketChannel;
byte[] outData;
final static int MAX_OUTPUT = 1024;
ByteBuffer output = ByteBuffer.allocate(MAX_OUTPUT);
// if message was not written fully
if (socketChannel.write(output) < messageSize()) {
// register interest for write event
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);
selectionKey.attach(writeHandler);
selector.wakeup();
}
最后,应该有定时任务检查连接是否仍然alive/SelectionKeys被取消。如果客户端断开 TCP 连接,服务器通常不会知道这一点。结果,内存中将有许多事件处理程序,作为附件绑定到陈旧的连接,这将导致内存泄漏。
这就是为什么你会说附件不好,但问题是可以解决的。
这里有两种简单的方法来处理这个问题:
可以启用 TCP 保活
周期性任务可以检查给定通道上最后一个 activity 的时间戳。如果长时间空闲,服务器应该终止连接。
有一篇来自 Amazon 某人的古老且非常不准确的 NIO 博客,其中错误地断言密钥附件是内存泄漏。完整而彻底的废话。甚至不符合逻辑。这也是他断言您需要各种补充队列的地方。在 NIO 大约 13 年的时间里,从来没有这样做过。
您需要的是每个通道一个 ByteBuffer
, 或者两个,一个用于读取,一个用于写入。您可以存储一个作为附件本身:如果您想要两个,或者要存储其他数据,您需要自己定义一个 Session
class ,它包含两个缓冲区和您想要关联的任何其他内容使用通道,例如客户端凭据,并使用 Session
对象作为附件。
你真的不能在 NIO 中为所有通道使用一个缓冲区。
我有一个 Java NIO 服务器,它从客户端接收数据。
当通道准备好读取时,即 key.isReadable()
return true read(key)
被调用以读取数据。
目前我对所有通道使用单个读取缓冲区,在 read()
方法中,我清除缓冲区并读入它,然后最后放入一个字节数组,假设我将获取所有数据一枪。
但是假设我一次没有得到完整的数据(我在数据结尾处有特殊字符需要检测)。
问题:
那么现在如何将这部分数据保留在通道中或如何处理部分读取问题?或全球?
我看附件哪里不好
看看 Reactor 模式。这是 Doug Lea 教授对基本实现的 link:
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
想法是让单个反应器线程在 Selector 调用时阻塞。一旦准备好 IO 事件,反应器线程就会将事件分派给适当的处理程序。 在上面的 pdf 中,Reactor 中有一个内部 class Acceptor,它接受新的连接。
作者对读取和写入事件使用单个处理程序并维护此处理程序的状态。我更喜欢为读取和写入设置单独的处理程序,但这不像 'state machine' 那样容易使用。每个事件只能有一个附件,因此需要某种注入来切换 read/write 处理程序。
要在后续 read/write 之间保持状态,您必须做几件事:
- 引入自定义协议,在消息被完全阅读时告诉您
- 有超时或清除失效连接的机制
- 维护特定于客户的会话
所以,你可以这样做:
public class Reactor implements Runnable{
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
public Reactor(int port) throws IOException {
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// let Reactor handle new connection events
registerAcceptor();
}
/**
* Registers Acceptor as handler for new client connections.
*
* @throws ClosedChannelException
*/
private void registerAcceptor() throws ClosedChannelException {
SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey0.attach(new Acceptor());
}
@Override
public void run(){
while(!Thread.interrupted()){
startReactorLoop();
}
}
private void startReactorLoop() {
try {
// wait for new events for each registered or new clients
selector.select();
// get selection keys for pending events
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();
while (selectedKeysIterator.hasNext()) {
// dispatch even to handler for the given key
dispatch(selectedKeysIterator.next());
// remove dispatched key from the collection
selectedKeysIterator.remove();
}
} catch (IOException e) {
// TODO add handling of this exception
e.printStackTrace();
}
}
private void dispatch(SelectionKey interestedEvent) {
if (interestedEvent.attachment() != null) {
EventHandler handler = (EventHandler) interestedEvent.attachment();
handler.processEvent();
}
}
private class Acceptor implements EventHandler {
@Override
public void processEvent() {
try {
SocketChannel clientConnection = serverSocketChannel.accept();
if (clientConnection != null) {
registerChannel(clientConnection);
}
} catch (IOException e) {e.printStackTrace();}
}
/**
* Save Channel - key association - in Map perhaps.
* This is required for subsequent/partial reads/writes
*/
private void registerChannel(SocketChannel clientChannel) {
// notify injection mechanism of new connection (so it can activate Read Handler)
}
处理读取事件后,通知注入机制可以注入写入处理程序。
当新连接可用时,注入机制会创建一次读写处理程序的新实例。这种注入机制会根据需要切换处理程序。每个 Channel 的处理程序的查找是通过方法 `registerChannel().
在连接 Acceptance 中填充的 Map 完成的。读取和写入处理程序有 ByteBuffer
个实例,并且由于每个套接字通道都有自己的一对处理程序,您现在可以在部分读取和写入之间维护状态。
提高性能的两个技巧:
尝试在接受连接后立即进行首次读取。仅当您没有读取自定义协议中 header 定义的足够数据时,才为读取事件注册频道兴趣。
尝试先写而不注册写事件的兴趣,只有当你不写所有数据时,才注册兴趣 写.
这将减少选择器唤醒的次数。
像这样:
SocketChannel socketChannel;
byte[] outData;
final static int MAX_OUTPUT = 1024;
ByteBuffer output = ByteBuffer.allocate(MAX_OUTPUT);
// if message was not written fully
if (socketChannel.write(output) < messageSize()) {
// register interest for write event
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);
selectionKey.attach(writeHandler);
selector.wakeup();
}
最后,应该有定时任务检查连接是否仍然alive/SelectionKeys被取消。如果客户端断开 TCP 连接,服务器通常不会知道这一点。结果,内存中将有许多事件处理程序,作为附件绑定到陈旧的连接,这将导致内存泄漏。
这就是为什么你会说附件不好,但问题是可以解决的。
这里有两种简单的方法来处理这个问题:
可以启用 TCP 保活
周期性任务可以检查给定通道上最后一个 activity 的时间戳。如果长时间空闲,服务器应该终止连接。
有一篇来自 Amazon 某人的古老且非常不准确的 NIO 博客,其中错误地断言密钥附件是内存泄漏。完整而彻底的废话。甚至不符合逻辑。这也是他断言您需要各种补充队列的地方。在 NIO 大约 13 年的时间里,从来没有这样做过。
您需要的是每个通道一个 ByteBuffer
, 或者两个,一个用于读取,一个用于写入。您可以存储一个作为附件本身:如果您想要两个,或者要存储其他数据,您需要自己定义一个 Session
class ,它包含两个缓冲区和您想要关联的任何其他内容使用通道,例如客户端凭据,并使用 Session
对象作为附件。
你真的不能在 NIO 中为所有通道使用一个缓冲区。