CreateIoCompletionPort 中的 NumberOfConcurrentThreads 参数

NumberOfConcurrentThreads parameter in CreateIoCompletionPort

仍然CreateIoCompletionPort()中的NumberOfConcurrentThreads参数感到困惑。我已经阅读并重读了 MSDN dox,但引用

This value limits the number of runnable threads associated with the completion port.

我仍然很困惑。

问题

假设我将此值指定为 4。在这种情况下,这是否意味着:

1) 一个线程可以调用 GetQueuedCompletionStatus() (此时我可以允许另外 3 个线程进行此调用),然后一旦该调用 returns (即我们有一个完成packet) 然后我可以让 4 个线程再次调用这个函数,

2) 一个线程可以调用 GetQueuedCompletionStatus() (此时我可以允许另外 3 个线程进行此调用),然后一旦该调用 returns (即我们有一个完成数据包)然后我继续处理该数据包。只有当我完成数据包处理后,我才会调用 GetQueuedCompletionStatus(),此时我可以 然后 有 4 个线程再次调用此函数。

看到我的困惑了吗?它使用短语 'runnable threads'.

我觉得可能是后者,因为上面的link也引用了

If your transaction required a lengthy computation, a larger concurrency value will allow more threads to run. Each completion packet may take longer to finish, but more completion packets will be processed at the same time.

这将最终影响我们设计服务器的方式。考虑一个从客户端接收数据的服务器,然后将该数据回显到日志服务器。这是我们的线程例程的样子:

DWORD WINAPI ServerWorkerThread(HANDLE hCompletionPort)
{
    DWORD BytesTransferred;
    CPerHandleData* PerHandleData = nullptr;
    CPerOperationData* PerIoData = nullptr;

    while (TRUE)
    {
        if (GetQueuedCompletionStatus(hCompletionPort, &BytesTransferred,
            (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
        {
            // OK, we have 'BytesTransferred' of data in 'PerIoData', process it:
            // send the data onto our logging servers, then loop back around
            send(...);
        }
    }
    return 0;
}

现在假设我有一台四核机器;如果我在对 CreateIoCompletionPort() 的调用中将 NumberOfConcurrentThreads 保留为零,我将有四个线程 运行 ServerWorkerThread()。很好。

我担心的是 send() 调用可能会因网络流量而花费很长时间。因此,我可能会从客户端接收大量无法出列的数据,因为所有四个线程都需要很长时间才能发送数据?!

我是不是忽略了重点?

更新 07.03.2018 (现已解决:参见 。)

我的机器上有 8 个线程 运行,每个线程都运行 ServerWorkerThread()

DWORD WINAPI ServerWorkerThread(HANDLE hCompletionPort)
{
DWORD BytesTransferred;
CPerHandleData* PerHandleData = nullptr;
CPerOperationData* PerIoData = nullptr;

while (TRUE)
{
    if (GetQueuedCompletionStatus(hCompletionPort, &BytesTransferred,
        (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
    {
    switch (PerIoData->Operation)
    {
    case  CPerOperationData::ACCEPT_COMPLETED:
    {
        // This case is fired when a new connection is made
        while (1) {}
    }
}
}

我只有一个未完成的 AcceptEx() 电话;当它被新连接填充时,我 post 另一个。我不等待在 AcceptEx().

中接收数据

我创建完成端口如下:

CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4)

现在,因为我在完成端口只允许 4 个线程,所以我认为 因为我让线程忙碌(即它们不进入等待状态),当我尝试建立第五个连接,完成数据包不会出队,因此会挂起!然而,这种情况并非如此;我可以与我的服务器建立 5 个甚至 6 个连接!这表明即使我允许的最大线程数 (4) 已经 运行,我仍然可以使数据包出队? 这就是我困惑的原因!

完成端口 - 确实是 KQUEUE 对象。 NumberOfConcurrentThreads 对应于 MaximumCount

Maximum number of concurrent threads the queue can satisfy waits for.

来自 I/O 完成端口

When the total number of runnable threads associated with the completion port reaches the concurrency value, the system blocks the execution of any subsequent threads associated with that completion port until the number of runnable threads drops below the concurrency value.

不好说的也不是很清楚。当线程调用 KeRemoveQueue ( GetQueuedCompletionStatus internal call it) system return packet to thread only if Queue->CurrentCount < Queue->MaximumCount even if exist packets in queue. system not blocks any threads of course. from another side look for KiInsertQueue - even if some threads wait on packets - it activated only in case Queue->CurrentCount < Queue->MaximumCount.

另请查看 Queue->CurrentCount 的更改方式和时间。查找 KiActivateWaiterQueue (This function is called when the current thread is about to enter a wait state) and KiUnlinkThread. in general - when thread begin wait for any object (or another queue) system call KiActivateWaiterQueue - it decrement CurrentCount and possible (if exist packets in queue and became Queue->CurrentCount < Queue->MaximumCount and threads waited for packets) return packet to wait thread. from another side, when thread stop wait - KiUnlinkThread 被调用。它增加 CurrentCount.

你的两个变体都是错误的。任意数量的线程都可以调用 GetQueuedCompletionStatus()。系统当然不会阻止任何后续线程的执行。例如 - 你有 MaximumCount = 4 的队列。您可以将 10 个数据包排队。并从 7 个线程并发调用 GetQueuedCompletionStatus()。但只有 4 个收到数据包。另一个将等待(尽管队列中还有 6 个数据包)。如果一些从队列中删除数据包的线程开始等待 - 系统只是不等待并且 return 数据包到另一个线程等待队列。或者如果线程(之前已经从该队列中删除数据包(Thread->Queue == Queue) - 所以活动线程)再次调用 KeRemoveQueue 将是 Queue->CurrentCount -= 1;