IO 完成端口返回 NULL Completion Key

IO Completion port returning NULL Completion Key

第一次使用IO完成端口。我遇到一个问题,其中 GetQueuedCompletionStatus returns 完成键为 Null,我用它来传递带有代码其他部分句柄的数据结构。

GetQueuedCompletionStatus 似乎触发了收到的消息,否则就没问题了。

我试图只包含涉及 IO 完成端口的代码:

数据结构:

    typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE, * LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* MessageProcessor;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* MessageProcessor;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;

#define OPERATION_TYPE_UNKNOWN      0
#define OPERATION_TYPE_SEND         1
#define OPERATION_TYPE_RECV         2
typedef struct
{
    OVERLAPPED* Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

完成端口初始化为:

m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

听众:

//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,
            &(pSocketData->hAcceptEvent),
            false,
            100,
            false);

        //Nothing happened, back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event, find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
            pSocketData->hAcceptEvent,
            &NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {
                
                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = accept(pSocketData->Socket,
                    (LPSOCKADDR)&NewSockAddr,
                    &nLen);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n", WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection, No Delay
                const char chOpt = 1;
                int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
                if (nErr == -1)
                {
                    wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
                    break;
                }


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->MessageProcessor = pSocketData->MessageProcessor;

                //Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD)ConnectedSocketData, 0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }                

                //Set the PerIOData, will be used at completion time
                LPPER_IO_OPERATION_DATA PerIOOperationData = new PER_IO_OPERATION_DATA;
                PerIOOperationData->BufferLen = 0;
                PerIOOperationData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                WSABUF DataBuf;
                DataBuf.len = DATA_BUFSIZE;
                DataBuf.buf = PerIOOperationData->Buffer;
                PerIOOperationData->Overlapped = new OVERLAPPED;
                ZeroMemory(PerIOOperationData->Overlapped, sizeof(OVERLAPPED));

                //Kick off the first Recv request for the Socket, will be handled by the completion Queue.
                if (WSARecv(NewSocket, &DataBuf, 1, &RecvBytes, &Flags, (PerIOOperationData->Overlapped), NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n", WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
                break;
            }
        }
    }
}

工作线程,由于结构为空而尝试使用 ConnectedSocketData 时崩溃:

    // Worker thread, processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF DataBuf;
    DWORD RecvBytes = 0;
    int DestinationAddress = 0;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&lpOverlapped, INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() failed with error %d\n", Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() failed with error %d\n", WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }

        // retrieve IO data
        PerIoData = CONTAINING_RECORD(lpOverlapped, PER_IO_OPERATION_DATA, Overlapped);

        vector<SiteData>::iterator SiteDataIterator;
        vector<InstrumentData>::iterator InstrumentDataIterator;

        for (SiteDataIterator = ConnectedSocketData->CfgHandle->SiteConnections.begin();
            SiteDataIterator != ConnectedSocketData->CfgHandle->SiteConnections.end();
            SiteDataIterator++)
        {
            if (SiteDataIterator->Port == ConnectedSocketData->Port)
            {
                break;
            }
        }

为什么 IOCP 没有传递完成密钥的任何想法?

Any Ideas why the IOCP is not passing the Completion Key?

当然,它会将您传递给 CreateIoCompletionPort 和 I/O 的内容准确地传回 OVERLAPPED

但一开始

CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD)ConnectedSocketData, 0)

是错误的-这里只使用了ConnectedSocketData的低32位,必须是

CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0)

那么,你对PER_IO_OPERATION_DATA

的定义
typedef struct
{
    OVERLAPPED* Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

总是严重错误。必须是

typedef struct
{
    OVERLAPPED Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

Overlapped(定义为OVERLAPPED* Overlapped时)无法收回PER_IO_OPERATION_DATA的地址。但是从 &Overlapped (当它定义为 OVERLAPPED Overlapped )已经是可能的。

我最好做下一个定义

struct PER_IO_OPERATION_DATA : public OVERLAPPED
{
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
};

并使用

WSARecv(NewSocket, &DataBuf, 1, &RecvBytes, &Flags, PerIOOperationData, NULL)

PerIoData = static_cast<PER_IO_OPERATION_DATA*>(lpOverlapped);

GetQueuedCompletionStatus return false 时你处理错误 - 你在这种情况下泄漏 PerIoData 它。

这只是您代码中最常见的错误,与您的直接问题相关