Kafka Source - 理解 Selector.poll() 的语义

Kafka Source - Understanding the semantics of Selector.poll()

我正在研究 Kafka 的网络层代码,有几个关于选择器的问题 class,特别是 poll() 方法的实现方式。 poll() 方法是这样的:

void poll(int timeout){
....
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }

...
}

是否有特定要求,因为我们首先调用 pollSelectionKeys() 方法 对于 select() 方法返回的键,然后是立即连接的键?是 只是为了清楚起见,我们分别执行这些操作,或者是否有一些特定的 涉及要求?

其次,在pollSelectionKeys()方法中,我们有:

void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                       boolean isImmediatelyConnected,
                                       long currentTimeNanos){
...
    /* if channel is ready write to any sockets that have space in their buffer and for which
    we have data */
    if (channel.ready() && key.isWritable()) {
        Send send = channel.write();
        if (send != null) {
            this.completedSends.add(send);
            this.sensors.recordBytesSent(channel.id(), send.size());
        }
    }
...
}

据我所知,我们只在 KafkaChannel 属于 我们从之前调用 select() 方法获得的 keySet,或者如果 KafkaChannelimmediatelyConnectedKeys 之一相关联。我的问题是,为什么我们要 以这种方式写信给KafkaChannels?更具体地说,我们不只是迭代 在所有已连接的 KafkaChannels 上,如果他们有 Send 则写信给他们 与他们相关的对象?这样,我们尽快写入KafkaChannel, 无需等待它属于 immediatelyConnectedKeysreadyKeys.

TCP 连接不可用,直到 I/O 连接完成

答案就在Selectorclass的connect方法中(下面相关部分)

 connected = socketChannel.connect(address);
..............................
................................

 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);

根据 NIO 的文档解释 SocketChannel connect

If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.

所以典型的交互流程如下(解释的很好here

如果您以非阻塞模式连接,您应该:

  • register the channel for OP_CONNECT
  • when it fires call finishConnect()
  • if that returns true, deregister OP_CONNECT and register OP_READ or OP_WRITE depending on what you want to do next
  • if it returns false, do nothing, keep selecting
  • if either connect() or finishConnect() throws an exception, close the channel and try again or forget about it or tell the user or
    whatever is appropriate.

If you don't want to do anything until the channel connects, do the connect in blocking mode and go into non-blocking mode when the connect succeeds.

此连接方法可能会立即连接,就像本地连接的情况一样,并且可能不会触发为此连接注册的OP_CONNECT事件socketChannel(连接后的几行call) ,所以在使用典型的 java NIO 注册代码时,我们可能会错过它。我们最终需要在此类通道上调用 finishConnect(请参阅工作流程中的第二个要点)。所以我们将这样的频道密钥添加到另一个 Set immediatelyConnectedKeys 以便它们可以稍后处理,否则我们将完全错过它们。

 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }

然后在 pollSelectionKeys 方法中(注意 finishConnect 的使用,它是对 finishConnect to the underlying SocketChannel

的调用
 /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
.........................
.........................

总而言之,Kafka 代码看起来像是标准的 NIO 代码,除非 Kafka 团队可以提供更多代码 explain.Further 可以找到关于这个主题的好书 . An interesting misunderstanding related to this ( bug filing and eventual rejection by JDK team) can be found here

对于问题的第二部分,您可能会询问以下代码。为什么两次单独调用密钥

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }

看到我们现在维护了两组键。虽然 selector.keys() 提供了一个整体键视图,但是键集不能直接修改,所以它是一种只读视图。这个key-set中的key只有在它被取消并且它的频道被注销后才会被移除。所以通常 selector.selectedKeys() 用于访问就绪通道。另外 selector.selectedKeys() 显然不会 return 来自 immediatelyConnectedKeys 的键。处理从 selector.selectedKeys() 获得的这些键的通常模式是迭代 set ,测试什么事件(可接受的,可连接的,readable/writable)由 钥匙已准备好,做你的事情,然后将其从集合中删除。这个删除部分相当 necessary.The Selector 不会从所选键集本身中删除 SelectionKey 实例。当您完成通道处理后,您必须这样做。 下次通道变为 "ready" 时,选择器将再次将其添加到所选键集中。所以这就是两者都被处理的原因,方法 pollSelectionKeys 旨在兼顾两者。