IO 完成端口的 Recv 缓冲区始终为空

Recv Buffer for IO Completion port always empty

好的,这与之前的 post 有关,但这是一个不同的错误,所以我提出了一个新问题。上一个 Post:

因此在我设置的 IO 完成端口上收到一条接收消息,它会触发 GetQueuedCompletionStatus 和 returns 以及完成密钥和重叠数据。两者看起来都不错,我可以看到数据填充在它们的结构中。然而,传递给 WSARecv 的 Buffer 并没有填充传入的消息。 (BytesTransfered 表示收到了字节,但 WSABUF 中没有数据)。

这是目前的代码,寻求有关为什么未填充缓冲区的帮助。

networkhandlerthread.ccp

#include "NetworkHandlerThread.h"

// Worker thread, processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF* DataBuf;
    DWORD RecvBytes = 0;
    Type1MessageParser Type1MsgParser;
    Type2MessageParser Type2MsgParser;
    int DestinationAddress = 0;
    bool IsType1 = false;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&PerIoData, INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() failed with error %d\n", Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() failed with error %d\n", WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }


        //We have a message, determine if it's something we receaved or something we should send.
        if (PerIoData->OperationType == OPERATION_TYPE_RECV)
        {
            ///tbd process recv
            ConnectedSocketData; //this is comming in good and has data
            PerIoData->Buffer; // this is empty (pointer is good, but no data)
        }
        else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
        {
            ///tbd process send
        }
    }
};


//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,
            &(pSocketData->hAcceptEvent),
            false,
            100,
            false);

        //Nothing happened, back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event, find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
            pSocketData->hAcceptEvent,
            &NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {

                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = WSAAccept(pSocketData->Socket,
                    (LPSOCKADDR)&NewSockAddr,
                    &nLen, NULL, NULL);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n", WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection, No Delay
                //const char chOpt = 1;
                //int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
                //if (nErr == -1)
                //{
                //    wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
                //    break;
                //}


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;

                //Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }

                //Set the PerIOData, will be used at completion time
                LPPER_IO_OPERATION_DATA PerIoData;
                PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));

                ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
                PerIoData->BufferLen = 0;
                PerIoData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                PerIoData->Buffer.buf = PerIoData->cBuffer;
                PerIoData->Buffer.len = DATA_BUFSIZE;


                //Kick off the first Recv request for the Socket, will be handled by the completion Queue.
                if (WSARecv(NewSocket, &(PerIoData->Buffer), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n", WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
                break;
            }
        }
    }
}


NetworkHandlerThread::NetworkHandlerThread()
{
    m_CompletionPort = 0;
    m_hListenThread = 0;
}

NetworkHandlerThread::~NetworkHandlerThread()
{

}

void NetworkHandlerThread::StartNetworkHandler()
{
    int iResult = 0;
    SYSTEM_INFO SystemInfo;
    unsigned int i = 0;

    //Start WSA
    iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iResult != NO_ERROR) {
        wprintf(L"WSAStartup() failed with error: %d\n", iResult);
        return;
    }

    //Start Completion Port
    m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (m_CompletionPort != NULL)
    {
        wprintf(L"Completion Port Created\n");
    }

    //Get # of system processors
    GetSystemInfo(&SystemInfo);

    //create Worker Threads for each processor.
    for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
    {
        HANDLE ThreadHandle;

        // Create a server worker thread, and pass the
        // completion port to the thread. 
        ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, m_CompletionPort, 0, NULL);

        // Close the thread handle
        if (ThreadHandle != NULL)
        {
            CloseHandle(ThreadHandle);
        }
    }
}

void NetworkHandlerThread::AddListenThread(int Port,
    ConfigHandler* pConfigHandle,
    void* ForwardHandle)
{
    SOCKADDR_IN InternetAddr;
    int iResult = 0;
    LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;

    if (pListenSocketData == NULL)
    {
        return;
    }

    //Create the listener Socket
    pListenSocketData->Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (pListenSocketData->Socket == INVALID_SOCKET)
    {
        wprintf(L"socket function failed with error: %ld\n", WSAGetLastError());
        WSACleanup();
        return;
    }

    // Create a Event to handle Socket Accepts
    pListenSocketData->hAcceptEvent = WSACreateEvent();
    if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
    {
        wprintf(L"WSACreateEvent() error %ld\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    // Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
    int nRet = WSAEventSelect(pListenSocketData->Socket,
        pListenSocketData->hAcceptEvent,
        FD_ACCEPT);
    if (nRet == SOCKET_ERROR)
    {
        wprintf(L"WSAAsyncSelect() error %ld\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    //Assign the Port Number
    InternetAddr.sin_family = AF_INET;
    InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    InternetAddr.sin_port = htons(Port);
    pListenSocketData->Port = Port;
    pListenSocketData->IOCP = m_CompletionPort;
    pListenSocketData->CfgHandle = pConfigHandle;
    pListenSocketData->ForwardMessager = ForwardHandle;

    //Bind the Socket to the Port
    iResult = ::bind((pListenSocketData->Socket), (sockaddr*)&InternetAddr, sizeof(InternetAddr));
    if (iResult == SOCKET_ERROR) {
        wprintf(L"bind function failed with error %d\n", WSAGetLastError());
        iResult = closesocket(pListenSocketData->Socket);
        if (iResult == SOCKET_ERROR)
            wprintf(L"closesocket function failed with error %d\n", WSAGetLastError());
        WSACleanup();
        return;
    }

    //Listen for incoming connection requests.
    if (listen(pListenSocketData->Socket, SOMAXCONN) == SOCKET_ERROR)
    {
        wprintf(L"listen function failed with error: %d\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        WSACleanup();
        return;
    }

    wprintf(L"Listening on %ld", Port);

    m_hListenThread = (HANDLE)CreateThread(NULL,                // Security
        0,                  // Stack size - use default
        ListenThread,  // Thread fn entry point
        (void*)pListenSocketData, //Listen Socket Data
        0,                  // Init flag
        NULL);  // Thread address
}

NetworkHandlerThread.h

#pragma once
#include <WinSock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include "ForwardMessageHandler.h"
#include "ConfigHandler.h"
#include "Type1MessageParser.h"
#include "Type2Message-Parser.h"
#include "ThreadUtilities.h"

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

class NetworkHandlerThread
{
public:
    WSADATA wsaData;
    HANDLE m_CompletionPort;
    HANDLE m_hListenThread;

public:
    NetworkHandlerThread();
    ~NetworkHandlerThread();

    void StartNetworkHandler();

    void AddListenThread(int Port,
        ConfigHandler* pConfigHandle,
        void* ForwardHandle);
};

ThreadUtilities.h

#pragma once
#include <mutex>
#include "ConfigHandler.h"


using namespace std;

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE, * LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;

#define OPERATION_TYPE_UNKNOWN      0
#define OPERATION_TYPE_SEND         1
#define OPERATION_TYPE_RECV         2
typedef struct PER_IO_OPERATION_DATA
{
    OVERLAPPED overlapped;
    WSABUF Buffer;
    char cBuffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
};

#define LPPER_IO_OPERATION_DATA PER_IO_OPERATION_DATA

所以,在看了很长时间的代码之后犯了一个愚蠢的错误(睡了一个好觉后马上就发现了),Recv 得到了十六进制数据,第一个字符是 0x00。在调试器中查看时显示为空文本字符串,进一步检查时,所有字节都在缓冲区中。

以上是 IOCP 套接字的一个很好的例子,所以我将把它留在这里供人们参考。