TCP/IP IOCP 接收到的数据有时会损坏 - Windows 上的 Visual C++
TCP/IP IOCP received data sometimes corrupt - Visual C++ on Windows
我正在编写一个简单的测试 ICOP 客户端和服务器,以确保我正确使用 API 并且服务器正确接收客户端发送的数据。我已经包含了这个问题的所有代码。
这是我 运行 遇到一些问题的地方,接收缓冲区中的数据有时似乎已损坏(损坏是因为有时缓冲区中的数据块可能会乱序或丢失)。需要明确的是,这是单个接收缓冲区中的数据,我并不是说由于线程调度问题而导致多个缓冲区之间的顺序混乱。我之前发布了与此相关的问题 。但是,我在获得正确的代码示例方面做了更多的工作,所以我发布了一个新问题,并将 link 到此。我希望其他人能够 运行 这段代码并体验同样的奇怪行为。
测试代码
测试应用可以运行两种模式,客户端和服务器。 运行 服务器并开始侦听,运行 客户端并连接到服务器,一旦连接就会开始以允许的速度向服务器发送数据。然后服务器验证调用 WSARecv 后从 GetQueuedCompletionStatus 返回的每个缓冲区中的数据。每次 WSASend 完成时,我都会将结构的 OVERLAPPED 部分清零,然后使用原始数据缓冲区再次调用 WSASend。
客户端发送的每个数据缓冲区都是一个字节序列,一个字节一个字节递增,直到达到指定的最大值。我不会发送完整的 运行ge 0..255,以防大小正好适合数据包的倍数并以某种方式隐藏问题,因此在我的示例代码中字节 运行ge 来自 0..250。对于构建的每个发送缓冲区,我重复该模式 numberOfGroups 次。
这种格式应该意味着我可以有多个 WSARecv 未完成,然后完全独立于任何其他缓冲区验证返回缓冲区中的数据,这意味着不需要同步或重建顺序。即,我可以从第一个字节开始,验证它们是否一个接一个地递增到最大值,然后重置为 0。一旦我的测试没有问题,我就可以进行更复杂的操作,对接收到的缓冲区进行排序并验证更复杂的数据。
您可以在命令行上指定可以同时进行多少次未完成的 WSASend 和 WSARecv 调用。当有 2 个或更多未完成的 WSARecv 调用时,此问题似乎更常发生 far。对于 1,它可以 运行 一段时间后偶尔会检测到问题。
我一直在 Windows 7 上测试并使用 Visual Studio 2010 C++。
客户端和服务器同时调用的数量似乎有影响。两者都使用 2 似乎比某些组合更容易产生损坏的数据。
套接字和 IOCP 似乎需要相当多的样板代码才能启动和 运行ning 一个非常基本的客户端和服务器应用程序。接收缓冲区的实际代码只有几行,涉及调用 WSARecv 和处理来自 GetQueuedCompletionStatus 的已完成调用。
此代码调用 WSARecv
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
当 WSARecv 调用完成时,它们由工作线程处理 - 我已经删除了与从该代码段接收数据无关的行
void IOCPWorker::execute()
{
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soRecv:
{
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
default:;
}
}
}
}
对 connection->onRecv 的调用是我验证数据的地方。这里有什么地方看起来明显不对吗?
我已经包含了完整的代码以供参考,如果你喜欢冒险的话应该编译一下。
完整参考资料
侦听端口 3000 且最多有 2 个未完成的 WSARecv 调用的服务器示例
> IOCPTest.exe server 3000 2
连接到端口 3000 上的 127.0.0.1 的客户端示例,最多有 2 个未完成的 WSASend 调用
> IOCPTest.exe client 127.0.0.1 3000 2
该程序由少量 classes
IOCPConnectionManager
这个 class 处理连接侦听并启动工作线程。
IOCPConnection
只跟踪 SOCKET 和一些处理异步调用的方法。 IOCPConnection::onRecv 在 WSARecv returns 时调用并验证缓冲区中的数据。如果发现数据顺序不对,它只会打印一条消息和 returns。
IOCPWorker
工作线程。 IOCPWorker::execute() 是调用 GetQueuedCompletionStatus 的地方。
TestOverlapped
所需的 OVERLAPPED 结构。
您还需要为 link 用户添加 Ws2_32.lib 和 Mswsock.lib。
主cpp文件
/************************************************************************
* *
* Test IOCP Client and Server - David Shaw *
* *
* There is limited error handling here and it assumes ideal conditions *
* Some allocated objects are not freed at the end, this is a test only *
* *
************************************************************************/
#include "stdafx.h"
#include <iostream>
#include <string>
#include "IOCPTest.h"
#include <Windows.h>
void printUse()
{
std::cout << "Invalid arguments" << std::endl;
std::cout << "This test app has very limited error handling or memory management" << std::endl;
std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
int main(int argc, char *argv[])
{
if (argc < 4)
{
printUse();
return 0;
}
std::string mode(argv[1]);
if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
{
printUse();
return 0;
}
IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();
bool server = mode.compare("server") == 0;
if (server)
{
std::string listenPort(argv[2]);
std::string postedReceiveCount(argv[3]);
manager->listenPort = atoi(listenPort.c_str());
manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
manager->postedSendCount = 1; // Not really used in this mode
manager->startListening();
}
else
{
if (argc < 5)
{
printUse();
return 0;
}
std::string host(argv[2]);
std::string port(argv[3]);
std::string postedSendCount(argv[4]);
manager->postedReceiveCount = 1; // Not really used in this mode
manager->postedSendCount = atoi(postedSendCount.c_str());
IOCPTest::IOCPConnection *connection = manager->createConnection();
connection->host = host;
connection->port = atoi(port.c_str());
connection->connect();
}
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
IOCPTest.h
/************************************************************************
* *
* Test IOCP Client and Server - David Shaw *
* *
* There is limited error handling here and it assumes ideal conditions *
* std::cout might not be the best approach in a multithreaded *
* environment but this is just a simple test app. *
* Some allocated objects are not cleaned up at the end either, but *
* again this is just a test. *
* *
************************************************************************/
#ifndef IOCPTestH
#define IOCPTestH
#endif
#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>
namespace IOCPTest
{
class IOCPConnection;
enum IOCPSocketOperation
{
soUnknown,
soAccept,
soConnect,
soDisconnect,
soSend,
soRecv,
soQuit
};
struct TestOverlapped
{
OVERLAPPED overlapped;
WSABUF buffer;
IOCPSocketOperation operation;
IOCPConnection *connection;
bool resend; // Set this to keep sending the same data over and over
TestOverlapped(int bufferSize);
~TestOverlapped();
void reset();
};
typedef TestOverlapped *PTestOverlapped;
class IOCPConnectionManager
{
public:
static const int NUMACCEPTS = 5;
WSADATA wsaData;
HANDLE iocp;
SOCKET listenSocket;
USHORT listenPort;
int postedReceiveCount;
int postedSendCount;
void startListening();
void postAcceptEx();
IOCPConnection *createConnection();
IOCPConnectionManager();
};
class IOCPConnection
{
public:
SOCKET socket;
IOCPConnectionManager *manager;
std::string host;
USHORT port;
void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void postRecv(PTestOverlapped overlapped = nullptr);
void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void send(PTestOverlapped overlapped);
void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void connect();
};
class IOCPWorker
{
public:
HANDLE threadHandle;
DWORD threadId;
IOCPConnectionManager *manager;
IOCPWorker(bool suspended);
void start();
void execute();
};
}
IOCPTest.cpp
#include "stdafx.h"
#include "IOCPTest.h"
#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>
namespace IOCPTest
{
LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;
BOOL AcceptEx
(
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
)
{
if (fnAcceptEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
return false;
}
}
return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}
BOOL ConnectEx(
SOCKET s,
const struct sockaddr FAR *name,
int namelen,
PVOID lpSendBuffer,
DWORD dwSendDataLength,
LPDWORD lpdwBytesSent,
LPOVERLAPPED lpOverlapped
)
{
if (fnConnectEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
return false;
}
}
return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}
// TestOverlapped
TestOverlapped::TestOverlapped(int bufferSize):
overlapped(),
operation(soUnknown),
connection(nullptr),
buffer(),
resend(false)
{
if (bufferSize > 0)
{
buffer.len = bufferSize;
buffer.buf = (CHAR*) malloc(bufferSize);
}
}
TestOverlapped::~TestOverlapped()
{
if (buffer.buf != nullptr)
{
free(buffer.buf);
}
}
void TestOverlapped::reset()
{
overlapped = OVERLAPPED();
}
// IOCPConnectionManager
IOCPConnectionManager::IOCPConnectionManager():
wsaData(),
listenSocket(0),
listenPort(0),
postedReceiveCount(1)
{
WSAStartup(WINSOCK_VERSION, &wsaData);
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
SYSTEM_INFO systemInfo = SYSTEM_INFO();
GetSystemInfo(&systemInfo);
for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
{
IOCPWorker* worker = new IOCPWorker(true);
worker->manager = this;
worker->start();
}
}
void IOCPConnectionManager::startListening()
{
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY; // Listen on all addresses
localAddress.sin_port = htons(listenPort);
if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error in binding listening socket" << std::endl;
}
if (listen(listenSocket, SOMAXCONN) == 0)
{
std::cout << "Listening on port " << listenPort << std::endl;
}
for (int i = 0; i < NUMACCEPTS; i++)
{
postAcceptEx();
}
}
void IOCPConnectionManager::postAcceptEx()
{
SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
connection->socket = acceptSocket;
CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0); // The thread count is ignored in this call when just associating the socket
PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16)); // As specified in documentation
overlapped->operation = soAccept;
overlapped->connection = connection;
DWORD byesReceived = 0;
int result = IOCPTest::AcceptEx
(
listenSocket,
acceptSocket,
overlapped->buffer.buf,
0, // Size of initial receiving buffer, excluding the space at the end for the two addressed
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
&byesReceived,
(LPOVERLAPPED) overlapped
);
if (!result)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
}
}
}
IOCPConnection *IOCPConnectionManager::createConnection()
{
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
return connection;
}
// IOCPConnection
void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
manager->postAcceptEx(); // Replace this accept
auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
if (returnCode == SOCKET_ERROR)
{
std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
}
std::cout << "Connection Accepted" << std::endl;
for (int i = 0; i < manager->postedReceiveCount; ++i)
{
postRecv();
}
}
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
if (numberOfBytesTransferred > 0)
{
byte *data = (byte *)overlapped->buffer.buf;
if (data[0] > maxByteExpected)
{
std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
return;
}
byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
{
if (data[i] != next)
{
// Not really the best solution for writing data out from multiple threads. Test app only.
std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
return;
}
else if (next == maxByteExpected)
{
next = 0;
}
else
{
++next;
}
}
//std::cout << "Valid buffer processed" << std::endl;
}
}
void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
for (int i = 0; i < manager->postedSendCount; ++i)
{
// Construct a sequence of incremented byte values 0..maxByteExpected repeated numberOfGroups
PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
sendOverlapped->connection = this;
for (int j = 0; j < numberOfGroups; ++j)
{
for (byte k = 0; k <= maxByteExpected; ++k)
{
((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
}
}
sendOverlapped->resend = true; // Repeat sending this data
send(sendOverlapped);
}
}
void IOCPConnection::send(PTestOverlapped overlapped)
{
overlapped->reset();
overlapped->operation = soSend;
DWORD bytesSent = 0;
DWORD flags = 0;
if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
}
}
}
void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}
void IOCPConnection::connect()
{
socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket == INVALID_SOCKET)
{
std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
return;
}
CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0); // The thread count is ignored in this call when just associating the socket
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY;
localAddress.sin_port = 0;
if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
return;
}
addrinfo hints = addrinfo();
addrinfo *remoteAddress = nullptr;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
std::stringstream ss;
ss << port;
//std::cout << ss.str() << std::endl;
if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
{
std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
return;
}
TestOverlapped *overlapped = new TestOverlapped(0);
overlapped->connection = this;
overlapped->operation = soConnect;
BOOL result = IOCPTest::ConnectEx
(
socket,
remoteAddress->ai_addr,
remoteAddress->ai_addrlen,
nullptr,
0,
nullptr,
LPOVERLAPPED(overlapped)
);
if (result == FALSE)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
//std::cerr << "Error calling ConnectEx. You'll need to add some more code if you want to know why :)" << std::endl;
std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
}
}
freeaddrinfo(remoteAddress);
}
// IOCPWorker
DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
((IOCPWorker*)lpParam)->execute();
return 0;
}
IOCPWorker::IOCPWorker(bool suspended)
{
threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}
void IOCPWorker::start()
{
ResumeThread(threadHandle);
}
void IOCPWorker::execute()
{
//std::cout << "TMVIOCPWorker::execute()" << std::endl;
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soAccept:
{
IOCPConnection *connection = overlapped->connection;
connection->onAcceptEx(overlapped, numberOfBytesTransferred);
delete overlapped;
overlapped = nullptr;
break;
}
case soConnect:
{
std::cout << "ConnectEx returned" << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onConnect(overlapped, numberOfBytesTransferred); // This method validates the received data
delete overlapped;
overlapped = nullptr;
break;
}
case soRecv:
{
//std::cout << "Received Data: " << numberOfBytesTransferred << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data
overlapped->reset();
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
case soSend:
{
IOCPConnection *connection = overlapped->connection;
connection->onSent(overlapped, numberOfBytesTransferred);
// Send the same data over and over
std::cout << "Resending buffer" << std::endl;
if (overlapped->resend)
{
connection->send(overlapped);
}
else
{
delete overlapped;
}
overlapped = nullptr;
break;
}
default:;
}
}
}
}
}
接收到的大多数缓冲区都是正确的,但是当 运行 套接字有 2 个接收缓冲区和 2 个发送缓冲区时,我仍然有很多这样的滚动:
Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115
我希望这只是我做错的简单事情。我在发送之前对数据缓冲区进行了 运行 与我在接收时所做的相同的验证,以确保我没有在那里做一些愚蠢的事情,但它通过了检查。我使用 IOCP not 用另一种语言编写了一个服务器,它似乎可以正确接收数据。我还用另一种语言编写了一个客户端,在这种情况下,IOCP 服务器似乎也能检测到损坏。但话虽如此,客户端和服务器都可能存在问题。我很感激任何人愿意花在这上面的时间。
好的,我可能已经发现你的问题了。看一下收到的数据,所有的字节都是按顺序排列的,但是突然顺序跳动了,好像被另一个调用打断了一样。现在,来自 WSASend and WSARecv 上的 MSDN 文档:
If you are using I/O completion ports, be aware that the order of calls made to WSASend is also the order in which the buffers are populated. WSASend should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.
If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.
就是这样。我现在真的不是你想要的好方法,但你所做的可能不是它应该使用的方式。
您是否在真实网络上尝试过?环回接口是一个特殊的电路,可能会有不同的行为,但它仍然是未定义的行为,所以你不应该依赖它。
测试了有问题的代码,似乎在单个套接字上对 WSARecv 的多个并发调用可能会导致传递给完成处理程序的结果缓冲区中的数据损坏。确保每个连接一次只发出一个 WSARecv 调用的锁将解决此问题。
这与 WSARecv 的当前 MSDN 文档一致。
If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.
虽然我个人认为文档可以更清楚,因为它暗示 'order that the buffers are filled' 可能是一个问题 - 这是众所周知的和有记录的,但没有提到入站数据流实际上可以是在缓冲区之间不可预测地传播。
这对我来说很有趣,因为我从来不知道这是一个问题,而且 15 年来我从未见过它:) 但是,我依靠对多个 WSARecv 完成进行排序来避免众所周知的记录问题线程调度会影响您处理读取完成的顺序,即使它们保证按照它们进入的顺序从 IOCP 中出来。我的排序需要每个读取缓冲区中的序列号,因此我有一个锁序列号递增和调用 WSARecv.
鉴于不可能从多个线程发出多个 WSARecv 并成功地重新创建入站数据流,除非你能以某种方式确定从完成发出 WSARecvs 的顺序我看不出这实际上是一个TCP 套接字的现实世界问题。它可能会对 UDP 造成问题,但是因为不需要排序,所以不需要锁定,除非是为了防止这个问题,虽然我认为我从来没有意识到我已经看到它,但我认为这可能是一个我参与过的一个系统的问题...
我需要对 WSASend 端进行更多测试,但我没有理由认为它比 WSARecv 调用更可能是线程安全的。啊好吧,你每天都能学到新东西...
我已经在博客上写过这个 here。
我认为不要post多次接收
任何时候只为一个套接字制作一个接收缓冲区。
处理完所有数据后,再次调用 WSARecv 获取更多数据。
我正在编写一个简单的测试 ICOP 客户端和服务器,以确保我正确使用 API 并且服务器正确接收客户端发送的数据。我已经包含了这个问题的所有代码。
这是我 运行 遇到一些问题的地方,接收缓冲区中的数据有时似乎已损坏(损坏是因为有时缓冲区中的数据块可能会乱序或丢失)。需要明确的是,这是单个接收缓冲区中的数据,我并不是说由于线程调度问题而导致多个缓冲区之间的顺序混乱。我之前发布了与此相关的问题
测试代码
测试应用可以运行两种模式,客户端和服务器。 运行 服务器并开始侦听,运行 客户端并连接到服务器,一旦连接就会开始以允许的速度向服务器发送数据。然后服务器验证调用 WSARecv 后从 GetQueuedCompletionStatus 返回的每个缓冲区中的数据。每次 WSASend 完成时,我都会将结构的 OVERLAPPED 部分清零,然后使用原始数据缓冲区再次调用 WSASend。
客户端发送的每个数据缓冲区都是一个字节序列,一个字节一个字节递增,直到达到指定的最大值。我不会发送完整的 运行ge 0..255,以防大小正好适合数据包的倍数并以某种方式隐藏问题,因此在我的示例代码中字节 运行ge 来自 0..250。对于构建的每个发送缓冲区,我重复该模式 numberOfGroups 次。
这种格式应该意味着我可以有多个 WSARecv 未完成,然后完全独立于任何其他缓冲区验证返回缓冲区中的数据,这意味着不需要同步或重建顺序。即,我可以从第一个字节开始,验证它们是否一个接一个地递增到最大值,然后重置为 0。一旦我的测试没有问题,我就可以进行更复杂的操作,对接收到的缓冲区进行排序并验证更复杂的数据。
您可以在命令行上指定可以同时进行多少次未完成的 WSASend 和 WSARecv 调用。当有 2 个或更多未完成的 WSARecv 调用时,此问题似乎更常发生 far。对于 1,它可以 运行 一段时间后偶尔会检测到问题。
我一直在 Windows 7 上测试并使用 Visual Studio 2010 C++。
客户端和服务器同时调用的数量似乎有影响。两者都使用 2 似乎比某些组合更容易产生损坏的数据。
套接字和 IOCP 似乎需要相当多的样板代码才能启动和 运行ning 一个非常基本的客户端和服务器应用程序。接收缓冲区的实际代码只有几行,涉及调用 WSARecv 和处理来自 GetQueuedCompletionStatus 的已完成调用。
此代码调用 WSARecv
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
当 WSARecv 调用完成时,它们由工作线程处理 - 我已经删除了与从该代码段接收数据无关的行
void IOCPWorker::execute()
{
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soRecv:
{
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
default:;
}
}
}
}
对 connection->onRecv 的调用是我验证数据的地方。这里有什么地方看起来明显不对吗?
我已经包含了完整的代码以供参考,如果你喜欢冒险的话应该编译一下。
完整参考资料
侦听端口 3000 且最多有 2 个未完成的 WSARecv 调用的服务器示例
> IOCPTest.exe server 3000 2
连接到端口 3000 上的 127.0.0.1 的客户端示例,最多有 2 个未完成的 WSASend 调用
> IOCPTest.exe client 127.0.0.1 3000 2
该程序由少量 classes
IOCPConnectionManager
这个 class 处理连接侦听并启动工作线程。
IOCPConnection
只跟踪 SOCKET 和一些处理异步调用的方法。 IOCPConnection::onRecv 在 WSARecv returns 时调用并验证缓冲区中的数据。如果发现数据顺序不对,它只会打印一条消息和 returns。
IOCPWorker
工作线程。 IOCPWorker::execute() 是调用 GetQueuedCompletionStatus 的地方。
TestOverlapped
所需的 OVERLAPPED 结构。
您还需要为 link 用户添加 Ws2_32.lib 和 Mswsock.lib。
主cpp文件
/************************************************************************
* *
* Test IOCP Client and Server - David Shaw *
* *
* There is limited error handling here and it assumes ideal conditions *
* Some allocated objects are not freed at the end, this is a test only *
* *
************************************************************************/
#include "stdafx.h"
#include <iostream>
#include <string>
#include "IOCPTest.h"
#include <Windows.h>
void printUse()
{
std::cout << "Invalid arguments" << std::endl;
std::cout << "This test app has very limited error handling or memory management" << std::endl;
std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
int main(int argc, char *argv[])
{
if (argc < 4)
{
printUse();
return 0;
}
std::string mode(argv[1]);
if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
{
printUse();
return 0;
}
IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();
bool server = mode.compare("server") == 0;
if (server)
{
std::string listenPort(argv[2]);
std::string postedReceiveCount(argv[3]);
manager->listenPort = atoi(listenPort.c_str());
manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
manager->postedSendCount = 1; // Not really used in this mode
manager->startListening();
}
else
{
if (argc < 5)
{
printUse();
return 0;
}
std::string host(argv[2]);
std::string port(argv[3]);
std::string postedSendCount(argv[4]);
manager->postedReceiveCount = 1; // Not really used in this mode
manager->postedSendCount = atoi(postedSendCount.c_str());
IOCPTest::IOCPConnection *connection = manager->createConnection();
connection->host = host;
connection->port = atoi(port.c_str());
connection->connect();
}
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
IOCPTest.h
/************************************************************************
* *
* Test IOCP Client and Server - David Shaw *
* *
* There is limited error handling here and it assumes ideal conditions *
* std::cout might not be the best approach in a multithreaded *
* environment but this is just a simple test app. *
* Some allocated objects are not cleaned up at the end either, but *
* again this is just a test. *
* *
************************************************************************/
#ifndef IOCPTestH
#define IOCPTestH
#endif
#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>
namespace IOCPTest
{
class IOCPConnection;
enum IOCPSocketOperation
{
soUnknown,
soAccept,
soConnect,
soDisconnect,
soSend,
soRecv,
soQuit
};
struct TestOverlapped
{
OVERLAPPED overlapped;
WSABUF buffer;
IOCPSocketOperation operation;
IOCPConnection *connection;
bool resend; // Set this to keep sending the same data over and over
TestOverlapped(int bufferSize);
~TestOverlapped();
void reset();
};
typedef TestOverlapped *PTestOverlapped;
class IOCPConnectionManager
{
public:
static const int NUMACCEPTS = 5;
WSADATA wsaData;
HANDLE iocp;
SOCKET listenSocket;
USHORT listenPort;
int postedReceiveCount;
int postedSendCount;
void startListening();
void postAcceptEx();
IOCPConnection *createConnection();
IOCPConnectionManager();
};
class IOCPConnection
{
public:
SOCKET socket;
IOCPConnectionManager *manager;
std::string host;
USHORT port;
void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void postRecv(PTestOverlapped overlapped = nullptr);
void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void send(PTestOverlapped overlapped);
void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void connect();
};
class IOCPWorker
{
public:
HANDLE threadHandle;
DWORD threadId;
IOCPConnectionManager *manager;
IOCPWorker(bool suspended);
void start();
void execute();
};
}
IOCPTest.cpp
#include "stdafx.h"
#include "IOCPTest.h"
#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>
namespace IOCPTest
{
LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;
BOOL AcceptEx
(
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
)
{
if (fnAcceptEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
return false;
}
}
return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}
BOOL ConnectEx(
SOCKET s,
const struct sockaddr FAR *name,
int namelen,
PVOID lpSendBuffer,
DWORD dwSendDataLength,
LPDWORD lpdwBytesSent,
LPOVERLAPPED lpOverlapped
)
{
if (fnConnectEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
return false;
}
}
return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}
// TestOverlapped
TestOverlapped::TestOverlapped(int bufferSize):
overlapped(),
operation(soUnknown),
connection(nullptr),
buffer(),
resend(false)
{
if (bufferSize > 0)
{
buffer.len = bufferSize;
buffer.buf = (CHAR*) malloc(bufferSize);
}
}
TestOverlapped::~TestOverlapped()
{
if (buffer.buf != nullptr)
{
free(buffer.buf);
}
}
void TestOverlapped::reset()
{
overlapped = OVERLAPPED();
}
// IOCPConnectionManager
IOCPConnectionManager::IOCPConnectionManager():
wsaData(),
listenSocket(0),
listenPort(0),
postedReceiveCount(1)
{
WSAStartup(WINSOCK_VERSION, &wsaData);
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
SYSTEM_INFO systemInfo = SYSTEM_INFO();
GetSystemInfo(&systemInfo);
for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
{
IOCPWorker* worker = new IOCPWorker(true);
worker->manager = this;
worker->start();
}
}
void IOCPConnectionManager::startListening()
{
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY; // Listen on all addresses
localAddress.sin_port = htons(listenPort);
if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error in binding listening socket" << std::endl;
}
if (listen(listenSocket, SOMAXCONN) == 0)
{
std::cout << "Listening on port " << listenPort << std::endl;
}
for (int i = 0; i < NUMACCEPTS; i++)
{
postAcceptEx();
}
}
void IOCPConnectionManager::postAcceptEx()
{
SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
connection->socket = acceptSocket;
CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0); // The thread count is ignored in this call when just associating the socket
PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16)); // As specified in documentation
overlapped->operation = soAccept;
overlapped->connection = connection;
DWORD byesReceived = 0;
int result = IOCPTest::AcceptEx
(
listenSocket,
acceptSocket,
overlapped->buffer.buf,
0, // Size of initial receiving buffer, excluding the space at the end for the two addressed
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
&byesReceived,
(LPOVERLAPPED) overlapped
);
if (!result)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
}
}
}
IOCPConnection *IOCPConnectionManager::createConnection()
{
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
return connection;
}
// IOCPConnection
void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
manager->postAcceptEx(); // Replace this accept
auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
if (returnCode == SOCKET_ERROR)
{
std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
}
std::cout << "Connection Accepted" << std::endl;
for (int i = 0; i < manager->postedReceiveCount; ++i)
{
postRecv();
}
}
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
if (numberOfBytesTransferred > 0)
{
byte *data = (byte *)overlapped->buffer.buf;
if (data[0] > maxByteExpected)
{
std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
return;
}
byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
{
if (data[i] != next)
{
// Not really the best solution for writing data out from multiple threads. Test app only.
std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
return;
}
else if (next == maxByteExpected)
{
next = 0;
}
else
{
++next;
}
}
//std::cout << "Valid buffer processed" << std::endl;
}
}
void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
for (int i = 0; i < manager->postedSendCount; ++i)
{
// Construct a sequence of incremented byte values 0..maxByteExpected repeated numberOfGroups
PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
sendOverlapped->connection = this;
for (int j = 0; j < numberOfGroups; ++j)
{
for (byte k = 0; k <= maxByteExpected; ++k)
{
((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
}
}
sendOverlapped->resend = true; // Repeat sending this data
send(sendOverlapped);
}
}
void IOCPConnection::send(PTestOverlapped overlapped)
{
overlapped->reset();
overlapped->operation = soSend;
DWORD bytesSent = 0;
DWORD flags = 0;
if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
}
}
}
void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}
void IOCPConnection::connect()
{
socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket == INVALID_SOCKET)
{
std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
return;
}
CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0); // The thread count is ignored in this call when just associating the socket
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY;
localAddress.sin_port = 0;
if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
return;
}
addrinfo hints = addrinfo();
addrinfo *remoteAddress = nullptr;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
std::stringstream ss;
ss << port;
//std::cout << ss.str() << std::endl;
if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
{
std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
return;
}
TestOverlapped *overlapped = new TestOverlapped(0);
overlapped->connection = this;
overlapped->operation = soConnect;
BOOL result = IOCPTest::ConnectEx
(
socket,
remoteAddress->ai_addr,
remoteAddress->ai_addrlen,
nullptr,
0,
nullptr,
LPOVERLAPPED(overlapped)
);
if (result == FALSE)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
//std::cerr << "Error calling ConnectEx. You'll need to add some more code if you want to know why :)" << std::endl;
std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
}
}
freeaddrinfo(remoteAddress);
}
// IOCPWorker
DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
((IOCPWorker*)lpParam)->execute();
return 0;
}
IOCPWorker::IOCPWorker(bool suspended)
{
threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}
void IOCPWorker::start()
{
ResumeThread(threadHandle);
}
void IOCPWorker::execute()
{
//std::cout << "TMVIOCPWorker::execute()" << std::endl;
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soAccept:
{
IOCPConnection *connection = overlapped->connection;
connection->onAcceptEx(overlapped, numberOfBytesTransferred);
delete overlapped;
overlapped = nullptr;
break;
}
case soConnect:
{
std::cout << "ConnectEx returned" << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onConnect(overlapped, numberOfBytesTransferred); // This method validates the received data
delete overlapped;
overlapped = nullptr;
break;
}
case soRecv:
{
//std::cout << "Received Data: " << numberOfBytesTransferred << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data
overlapped->reset();
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
case soSend:
{
IOCPConnection *connection = overlapped->connection;
connection->onSent(overlapped, numberOfBytesTransferred);
// Send the same data over and over
std::cout << "Resending buffer" << std::endl;
if (overlapped->resend)
{
connection->send(overlapped);
}
else
{
delete overlapped;
}
overlapped = nullptr;
break;
}
default:;
}
}
}
}
}
接收到的大多数缓冲区都是正确的,但是当 运行 套接字有 2 个接收缓冲区和 2 个发送缓冲区时,我仍然有很多这样的滚动:
Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115
我希望这只是我做错的简单事情。我在发送之前对数据缓冲区进行了 运行 与我在接收时所做的相同的验证,以确保我没有在那里做一些愚蠢的事情,但它通过了检查。我使用 IOCP not 用另一种语言编写了一个服务器,它似乎可以正确接收数据。我还用另一种语言编写了一个客户端,在这种情况下,IOCP 服务器似乎也能检测到损坏。但话虽如此,客户端和服务器都可能存在问题。我很感激任何人愿意花在这上面的时间。
好的,我可能已经发现你的问题了。看一下收到的数据,所有的字节都是按顺序排列的,但是突然顺序跳动了,好像被另一个调用打断了一样。现在,来自 WSASend and WSARecv 上的 MSDN 文档:
If you are using I/O completion ports, be aware that the order of calls made to WSASend is also the order in which the buffers are populated. WSASend should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.
If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.
就是这样。我现在真的不是你想要的好方法,但你所做的可能不是它应该使用的方式。
您是否在真实网络上尝试过?环回接口是一个特殊的电路,可能会有不同的行为,但它仍然是未定义的行为,所以你不应该依赖它。
测试了有问题的代码,似乎在单个套接字上对 WSARecv 的多个并发调用可能会导致传递给完成处理程序的结果缓冲区中的数据损坏。确保每个连接一次只发出一个 WSARecv 调用的锁将解决此问题。
这与 WSARecv 的当前 MSDN 文档一致。
If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.
虽然我个人认为文档可以更清楚,因为它暗示 'order that the buffers are filled' 可能是一个问题 - 这是众所周知的和有记录的,但没有提到入站数据流实际上可以是在缓冲区之间不可预测地传播。
这对我来说很有趣,因为我从来不知道这是一个问题,而且 15 年来我从未见过它:) 但是,我依靠对多个 WSARecv 完成进行排序来避免众所周知的记录问题线程调度会影响您处理读取完成的顺序,即使它们保证按照它们进入的顺序从 IOCP 中出来。我的排序需要每个读取缓冲区中的序列号,因此我有一个锁序列号递增和调用 WSARecv.
鉴于不可能从多个线程发出多个 WSARecv 并成功地重新创建入站数据流,除非你能以某种方式确定从完成发出 WSARecvs 的顺序我看不出这实际上是一个TCP 套接字的现实世界问题。它可能会对 UDP 造成问题,但是因为不需要排序,所以不需要锁定,除非是为了防止这个问题,虽然我认为我从来没有意识到我已经看到它,但我认为这可能是一个我参与过的一个系统的问题...
我需要对 WSASend 端进行更多测试,但我没有理由认为它比 WSARecv 调用更可能是线程安全的。啊好吧,你每天都能学到新东西...
我已经在博客上写过这个 here。
我认为不要post多次接收 任何时候只为一个套接字制作一个接收缓冲区。
处理完所有数据后,再次调用 WSARecv 获取更多数据。