如何在 NIO Server 上保存每个通道的数据

How to keep data with each channel on NIO Server

我有一个 Java NIO 服务器,它从客户端接收数据。

当通道准备好读取时,即 key.isReadable() return true read(key) 被调用以读取数据。

目前我对所有通道使用单个读取缓冲区,在 read() 方法中,我清除缓冲区并读入它,然后最后放入一个字节数组,假设我将获取所有数据一枪。

但是假设我一次没有得到完整的数据(我在数据结尾处有特殊字符需要检测)。

问题:

那么现在如何将这部分数据保留在通道中或如何处理部分读取问题?或全球?

我看附件哪里不好

看看 Reactor 模式。这是 Doug Lea 教授对基本实现的 link:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

想法是让单个反应器线程在 Selector 调用时阻塞。一旦准备好 IO 事件,反应器线程就会将事件分派给适当的处理程序。 在上面的 pdf 中,Reactor 中有一个内部 class Acceptor,它接受新的连接。

作者对读取和写入事件使用单个处理程序并维护此处理程序的状态。我更喜欢为读取和写入设置单独的处理程序,但这不像 'state machine' 那样容易使用。每个事件只能有一个附件,因此需要某种注入来切换 read/write 处理程序。

要在后续 read/write 之间保持状态,您必须做几件事:

  • 引入自定义协议,在消息被完全阅读时告诉您
  • 有超时或清除失效连接的机制
  • 维护特定于客户的会话

所以,你可以这样做:

public class Reactor implements Runnable{

    Selector selector = Selector.open();

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    public Reactor(int port) throws IOException {

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        serverSocketChannel.configureBlocking(false);

        // let Reactor handle new connection events
        registerAcceptor();

    }

    /**
     * Registers Acceptor as handler for new client connections.
     * 
     * @throws ClosedChannelException
     */
    private void registerAcceptor() throws ClosedChannelException {


        SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        selectionKey0.attach(new Acceptor());
    }

    @Override
    public void run(){

        while(!Thread.interrupted()){

            startReactorLoop();

        }

    }

    private void startReactorLoop() {

        try {

            // wait for new events for each registered or new clients
            selector.select();

            // get selection keys for pending events
            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();

            while (selectedKeysIterator.hasNext()) {

                // dispatch even to handler for the given key
                dispatch(selectedKeysIterator.next());

                // remove dispatched key from the collection
                selectedKeysIterator.remove();
            }

        } catch (IOException e) {
            // TODO add handling of this exception
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey interestedEvent) {

        if (interestedEvent.attachment() != null) {

            EventHandler handler = (EventHandler) interestedEvent.attachment();

            handler.processEvent();
        }

    }

    private class Acceptor implements EventHandler {

        @Override
        public void processEvent() {

            try {

                SocketChannel clientConnection = serverSocketChannel.accept();

                if (clientConnection != null) {

                    registerChannel(clientConnection);

                }

            } catch (IOException e) {e.printStackTrace();}

        }
    /**
     *  Save Channel - key association - in Map perhaps.
     * This is required for subsequent/partial reads/writes
     */
    private void registerChannel(SocketChannel clientChannel) {


        // notify injection mechanism of new connection (so it can activate Read Handler)
}

处理读取事件后,通知注入机制可以注入写入处理程序。

当新连接可用时,注入机制会创建一次读写处理程序的新实例。这种注入机制会根据需要切换处理程序。每个 Channel 的处理程序的查找是通过方法 `registerChannel().

在连接 Acceptance 中填充的 Map 完成的。

读取和写入处理程序有 ByteBuffer 个实例,并且由于每个套接字通道都有自己的一对处理程序,您现在可以在部分读取和写入之间维护状态。

提高性能的两个技巧:

  • 尝试在接受连接后立即进行首次读取。仅当您没有读取自定义协议中 header 定义的足够数据时,才为读取事件注册频道兴趣。

  • 尝试先写而不注册写事件的兴趣,只有当你不写所有数据时,才注册兴趣 写.

这将减少选择器唤醒的次数。

像这样:

SocketChannel socketChannel;

byte[] outData;

final static int MAX_OUTPUT = 1024;

ByteBuffer output = ByteBuffer.allocate(MAX_OUTPUT);

// if message was not written fully
if (socketChannel.write(output) < messageSize()) {

// register interest for write event
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE); 
        selectionKey.attach(writeHandler);
        selector.wakeup();

}

最后,应该有定时任务检查连接是否仍然alive/SelectionKeys被取消。如果客户端断开 TCP 连接,服务器通常不会知道这一点。结果,内存中将有许多事件处理程序,作为附件绑定到陈旧的连接,这将导致内存泄漏。

这就是为什么你会说附件不好,但问题是可以解决的。

这里有两种简单的方法来处理这个问题:

  • 可以启用 TCP 保活

  • 周期性任务可以检查给定通道上最后一个 activity 的时间戳。如果长时间空闲,服务器应该终止连接。

有一篇来自 Amazon 某人的古老且非常不准确的 NIO 博客,其中错误地断言密钥附件是内存泄漏。完整而彻底的废话。甚至不符合逻辑。这也是他断言您需要各种补充队列的地方。在 NIO 大约 13 年的时间里,从来没有这样做过。

您需要的是每个通道一个 ByteBuffer 或者两个,一个用于读取,一个用于写入。您可以存储一个作为附件本身:如果您想要两个,或者要存储其他数据,您需要自己定义一个 Session class ,它包含两个缓冲区和您想要关联的任何其他内容使用通道,例如客户端凭据,并使用 Session 对象作为附件。

你真的不能在 NIO 中为所有通道使用一个缓冲区。