C++ 套接字上的异步 I/O

Asynchronous I/O on C++ Sockets

我正在编写多线程套接字应用程序(针对 windows),但是,在连接多个客户端时出现问题。我可以从服务器向客户端发送消息,但是,我只能从一个客户端向服务器发送消息。其他客户端无法向服务器发送消息。我用谷歌搜索了一些东西,发现 Overlapping/Asynchronous I/O 是正确的选择。这只有一个问题,我不知道如何实现它,所以我基本上是在问我将如何去做,或者这种方法是否错误。

我要将 RecieveFromClients() 函数设为异步。

提前致谢。

这是我的代码:

main.cpp

#include "server.h"

int main()
{
    Server server;

    server.StartServer();

    return 0;
}

Server.h

#include <WinSock2.h>
#include <WS2tcpip.h>
#include <iphlpapi.h>
#include <stdio.h>

#include <string>
#include <iostream>
#include <vector>
#include <thread>

#pragma comment(lib, "Ws2_32.lib")

#define DEFAULT_PORT 4566
#define BACKLOG 10

class Server
{
public:
    Server();
    ~Server();
    int StartServer();
private:
    int Socket();
    int Bind();
    int Listen();
    void AcceptConnections();
    int StopServer();
    int CloseClientSocket(int client_num);

    void GetClients(int server_socket, std::vector<int>* clients,
        std::vector<sockaddr_in> *client_addrs);

    void SendToClients();
    void RecieveFromClients(int id);
private:
    sockaddr_in server_addr;
    int connected_clients, counted_clients;
    int server_socket;
    int result;
    int msgSize;
    std::vector<int> clients;
    std::vector<sockaddr_in> client_addrs;
private:
    std::thread get_clients;
    std::thread send_messages;
    std::thread recieve_messages;
};

Server.cpp

#include "server.h"

Server::Server()
    :
    connected_clients(0),
    counted_clients(0),
    server_socket(0),
    result(0)
{
    ZeroMemory(&server_addr, 0);

    WSAData wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
    {
        printf("wsastartip falied\n");
    }
}

Server::~Server()
{
    WSACleanup();
}

int Server::StartServer()
{
    if (Socket() != 0)
    {
        return 1;
    }

    if (Bind() != 0)
    {
        return 1;
    }

    if (Listen() != 0)
    {
        return 1;
    }

    AcceptConnections();

    return 0;
}

int Server::Socket()
{
    server_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (server_socket == INVALID_SOCKET)
    {
        std::cout << "Failed to create socket" << std::endl;
        return 1;
    }

    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(DEFAULT_PORT);
    server_addr.sin_addr.S_un.S_addr = INADDR_ANY;

    return 0;
}

int Server::Bind()
{
    result = bind(server_socket, (sockaddr*)&server_addr,
        sizeof(server_addr));
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to bind socket" << std::endl;
        return 1;
    }

    return 0;
}

int Server::Listen()
{
    result = listen(server_socket, BACKLOG);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Listening failed" << std::endl;
        return 1;
    }

    return 0;
}

void Server::AcceptConnections()
{
    get_clients = std::thread(&Server:: GetClients, this, server_socket,
        &clients, &client_addrs);
    send_messages = std::thread(&Server::SendToClients, this);
    recieve_messages = std::thread(&Server::RecieveFromClients, this, counted_clients);
    get_clients.join();
    send_messages.join();
    recieve_messages.join();
}

int Server::StopServer()
{
    std::terminate();

    for (int client : clients)
    {
        result = closesocket(client);
        if (result == SOCKET_ERROR)
        {
            std::cout << "Failed to close client socket" << std::endl;
            return 1;
        }
    }

    return 0;
}

int Server::CloseClientSocket(int client_num)
{
    result = closesocket(clients[client_num]);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to close client socket" << std::endl;
        return 1;
    }
    return 0;
}

void Server::GetClients(int server_socket, std::vector<int>* clients,
    std::vector<sockaddr_in>* client_addrs)
{
    
    while(true)
    {
        sockaddr_in client_addr = { 0 };
        socklen_t client_addrstrlen = sizeof(client_addr);
        int client;

        client = accept(server_socket, (sockaddr*)&client_addr,
            &client_addrstrlen);

        clients->push_back(client);
        client_addrs->push_back(client_addr);
        ++connected_clients;

        char ip[INET_ADDRSTRLEN] = "";
        char port[100] = "";
        inet_ntop(AF_INET, &client_addr.sin_addr, ip, sizeof(ip));
        std::cout << "Client connected from " << ip << ":" << 
            client_addr.sin_port << std::endl;
    }
}

void Server::SendToClients()
{
    std::string msg;
    do
    {
        msg.clear();
        getline(std::cin, msg);
        if (msg.size() > 255)
        {
            std::cout << "Message must be less than 256 bytes"
                << std::endl;
            continue;
        }

        for (int client : clients)
        {
            int size;
            size = send(client, msg.data(), msg.size(), 0);
            if (size == SOCKET_ERROR)
            {
                std::cout << "Failed to send message to client"
                    << std::endl;
            }
        }
    } while (msg != "exit");

    if (StopServer() != 0)
    {
        std::cout << "Failed to close client sockets" << std::endl;
    }
}

void Server::RecieveFromClients(int id)
{
    std::vector<char> msgBuffer(256);
    do
    {
        
        if (connected_clients > 0)
        {
            msgBuffer.clear();
            msgBuffer.resize(256);
            char ip[INET_ADDRSTRLEN];
            inet_ntop(AF_INET, &client_addrs[id].sin_addr, ip, sizeof(ip));

            if (msgSize = recv(clients[id], msgBuffer.data(),
                msgBuffer.size(), 0) > 0)
            {
                std::cout << ip << ": ";
                for (char c : msgBuffer)
                {
                    if (c != 0)
                    {
                        std::cout << c;
                    }
                }
                std::cout << std::endl;
            }
            else
            {
                if (msgSize == SOCKET_ERROR)
                {
                    std::cout << "Failed to recieve data" << std::endl;
                    break;
                }
                else if (clients[id] > 0)
                {
                    std::cout << "Client " << ip << " has disconnected" << std::endl;
                    CloseClientSocket(0);
                    break;
                }
            }
        }
        else
        {
            continue;
        }
    } while (true);
}

使用 Overlapped I/O 将对您的代码设计进行相当大的更改。但幸运的是,有一个更简单的解决方案。在您的 RecieveFromClients() 方法中,您可以使用 select() 来确定哪些客户端套接字在您尝试从中读取 之前 实际上有待读取的数据。您正在使用阻塞套接字,因此对 recv() 的调用将阻塞调用线程,直到收到数据,因此在您真正准备好读取某些内容之前,您不想执行阻塞读取。

此外,由于您没有为每个接受的客户端创建新线程,因此 RecieveFromClients()id 参数和 CloseClientSocket()client_num 参数是使用不当,应该完全删除。接收函数应该 运行 对连接的客户端列表进行循环。并且关闭函数应该使用特定的套接字句柄来关闭。

也就是说,您的代码的另一个主要设计问题是您在多个线程之间共享变量和容器,但您没有同步对它们中的任何一个的访问。这会给你带来长期的大问题 运行.