IO 完成端口:单独的线程池来处理出队的数据包?

IO Completion ports: separate thread pool to process the dequeued packets?

注意:我为此添加了 C++ 标记,因为 a) 代码是 C++ 和 b) 使用 C++ 的人很可能使用了 IO 完成端口。所以请不要喊。


我正在玩 IO 完成端口,并最终完全理解(并测试,证明) - 两者都在 RbMm 的帮助下 - CreateIoCompletionPort().[= 中 NumberOfConcurrentThreads 参数的含义21=]

我有以下小程序,它创建了 10 个线程,它们都在完成端口上等待。我告诉我的完成端口一次只允许 4 个线程运行(我有四个 CPU)。然后我将 8 个数据包排队到端口。如果将 ID > 4 的数据包从队列中取出,我的线程函数会输出一条消息;为了输出此消息,我必须至少停止四个当前 运行 线程中的一个,当我在控制台输入“1”时会发生这种情况。

现在这都是相当简单的代码。然而,我有一个大问题,那就是 如果所有正在处理完成数据包的线程都陷入困境,这将意味着没有更多的数据包可以出列和处理这就是我用我的无限循环进行模拟的结果 - 在我在控制台输入“1”之前没有更多的数据包出队这一事实突出了这个潜在的问题!

更好的解决方案不是让我的四个线程使数据包出列(或与 CPU 一样多的线程),然后当一个线程出列时,将该数据包的处理转交给工作线程 一个单独的池,从而消除了 IOCP 中所有线程陷入困境的风险,从而不再有数据包出队?

我问这个是所有我看到的IO完成端口代码的例子使用的方法类似于我下面展示的方法,不是 使用我建议的单独线程池。这就是让我觉得 遗漏了什么,因为我寡不敌众!

注意:这是一个有点做作的示例,因为 Windows will allow 如果其中一个可运行线程进入等待状态,则额外的数据包将被出队;我在我的代码中用注释掉的 cout 调用显示了这一点:

The system also allows a thread waiting in GetQueuedCompletionStatus to process a completion packet if another running thread associated with the same I/O completion port enters a wait state for other reasons, for example the SuspendThread function. When the thread in the wait state begins running again, there may be a brief period when the number of active threads exceeds the concurrency value. However, the system quickly reduces this number by not allowing any new active threads until the number of active threads falls below the concurrency value.

但是我不会在我的线程函数中调用SuspendThread而且我不知道除了cout之外还有哪些函数会导致线程进入一个等待状态 ,因此我无法预测我的一个或多个线程是否会陷入困境!因此我想到了线程池;至少上下文切换意味着其他数据包有机会出队!

#define _CRT_SECURE_NO_WARNINGS
#include <windows.h>
#include <thread>
#include <vector>
#include <algorithm>
#include <atomic>
#include <ctime>
#include <iostream>

using namespace std;

int main()
{
    HANDLE hCompletionPort1;
    if ((hCompletionPort1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4)) == NULL)
    {
        return -1;
    }
    vector<thread> vecAllThreads;
    atomic_bool bStop(false);

    // Fill our vector with 10 threads, each of which waits on our IOCP.
    generate_n(back_inserter(vecAllThreads), 10, [hCompletionPort1, &bStop] {
        thread t([hCompletionPort1, &bStop]()
        {
            // Thread body
            while (true)
            {
                DWORD dwBytes = 0;
                LPOVERLAPPED pOverlapped = 0;
                ULONG_PTR uKey;
                if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
                {
                    if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
                        break;  // Special completion packet; end processing.

                    //cout << uKey; // EVEN THIS WILL CAUSE A "wait" which causes MORE THAN 4 THREADS TO ENTER!

                    if (uKey >4) 
                        cout << "Started processing packet ID > 4!" << endl;
                    while (!bStop)
                        ;   // INFINITE LOOP
                }
            }
        });
        return move(t);
    }
    );

    // Queue 8 completion packets to our IOCP...only four will be processed until we set our bool
    for (int i = 1; i <= 8; ++i)
    {
        PostQueuedCompletionStatus(hCompletionPort1, 0, i, new OVERLAPPED);
    }

    while (!bStop)
    {
        int nVal;
        cout << "Enter 1 to cause current processing threads to end: ";
        cin >> nVal;
        bStop = (nVal == 1);
    }
    for (int i = 0; i < 10; ++i)    // Tell all 10 threads to stop processing on the IOCP
    {
        PostQueuedCompletionStatus(hCompletionPort1, 0, 0, 0);  // Special packet marking end of IOCP usage
    }
    for_each(begin(vecAllThreads), end(vecAllThreads), mem_fn(&thread::join));

    return 0;
}


编辑 #1

我所说的 "separate thread pool" 是这样的:

class myThread {
public:
    void SetTask(LPOVERLAPPED pO) { /* start processing pO*/ }
private:
    thread m_thread;    // Actual thread object
};

// The threads in this thread pool are not associated with the IOCP in any way whatsoever; they exist
// purely to be handed a completion packet which they then process!
class ThreadPool
{
public:
    void Initialise() { /* create 100 worker threads and add them to some internal storage*/}
    myThread& GetNextFreeThread() { /* return one of the 100 worker thread we created*/}
} g_threadPool;

我的四个线程中的每一个与IOCP关联的代码然后更改为

if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
{
    if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
        break;  // Special completion packet; end processing.

    // Pick a new thread from a pool of pre-created threads and assign it the packet to process
    myThread& thr = g_threadPool.GetNextFreeThread();
    thr.SetTask(pOverlapped);

    // Now, this thread can immediately return to the IOCP; it doesn't matter if the
    // packet we dequeued would take forever to process; that is happening in the 
    // separate thread thr *that will not intefere with packets being dequeued from IOCP!*
}

这样一来,我就不可能在没有更多数据包出队的情况下结束了!

似乎对于是否应该使用单独的线程池存在矛盾的意见。显然,正如我发布的示例代码所示,如​​果数据包的处理没有进入等待状态,则数据包有可能停止从 IOCP 中出队;鉴于此,无限循环可能是不现实的,但它确实证明了这一点。