IO 完成端口初始读取和双向数据
IO Completion Port Initial Read and Bi-Directional Data
我有以下简化的 IO 完成端口服务器 C++ 代码:
int main(..)
{
startCompletionPortThreadProc();
// Await client connection
sockaddr_in clientAddress;
int clientAddressSize = sizeof( clientAddress );
SOCKET acceptSocket = WSAAccept( serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL);
// Connected
CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, 0, 0 );
// Issue initial read
read( acceptSocket );
}
DWORD WINAPI completionPortThreadProc( LPVOID param )
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
while( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
{
if( WaitForSingleObject( exitEvent, 0 ) == WAIT_OBJECT_0 )
{
break;
}
if( !perIoData )
continue;
if( bytesTransferred == 0 )
{
//TODO
}
switch( perIoData->operation )
{
case OPERATION_READ:
{
// Bytes have been received
if( bytesTransferred < perIoData->WSABuf.len )
{
// Terminate string
perIoData->WSABuf.buf[bytesTransferred] = '[=10=]';
perIoData->WSABuf.buf[bytesTransferred+1] = '[=10=]';
}
// Add data to message build
message += std::tstring( (TCHAR*)perIoData->WSABuf.buf );
// Perform next read
perIoData->WSABuf.len = sizeof( perIoData->inOutBuffer );
perIoData->flags = 0;
if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == 0 )
{
// Part message
continue;
}
if( WSAGetLastError() == WSA_IO_PENDING )
{
// End of message
//TODO: Process message here
continue;
}
}
}
break;
case OPERATION_WRITE:
{
perIoData->bytesSent += bytesTransferred;
if( perIoData->bytesSent < perIoData->bytesToSend )
{
perIoData->WSABuf.buf = (char*)&( perIoData->inOutBuffer[perIoData->bytesSent] );
perIoData->WSABuf.len = ( perIoData->bytesToSend - perIoData->bytesSent);
}
else
{
perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer;
perIoData->WSABuf.len = _tcslen( perIoData->inOutBuffer ) * sizeof( TCHAR );
perIoData->bytesSent = 0;
perIoData->bytesToSend = perIoData->WSABuf.len;
}
if( perIoData->bytesToSend )
{
if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, 0, &( perIoData->overlapped ), NULL ) == 0 )
continue;
if( WSAGetLastError() == WSA_IO_PENDING )
continue;
}
}
break;
}
}
return 0;
}
bool SocketServer::read( SOCKET socket, HANDLE completionPort )
{
PER_IO_DATA* perIoData = new PER_IO_DATA;
ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );
perIoData->socket = socket;
perIoData->operation = OPERATION_READ;
perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer;
perIoData->WSABuf.len = sizeof( perIoData->inOutBuffer );
perIoData->overlapped.hEvent = WSACreateEvent();
DWORD bytesReceived = 0;
if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesReceived, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
{
int gle = WSAGetLastError();
if( WSAGetLastError() != WSA_IO_PENDING )
{
delete perIoData;
return false;
}
}
return true;
}
bool SocketServer::write( SOCKET socket, std::tstring& data )
{
PER_IO_DATA* perIoData = new PER_IO_DATA;
ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );
perIoData->socket = socket;
perIoData->operation = OPERATION_WRITE;
perIoData->WSABuf.buf = (char*)data.c_str();
perIoData->WSABuf.len = _tcslen( data.c_str() ) * sizeof( TCHAR );
perIoData->bytesToSend = perIoData->WSABuf.len;
perIoData->overlapped.hEvent = WSACreateEvent();
DWORD bytesSent = 0;
if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesSent, 0, &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
{
if( WSAGetLastError() != WSA_IO_PENDING )
{
delete perIoData;
return false;
}
}
return true;
}
1) 我遇到的第一个问题是初读。
在客户端连接(接受)时,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError() 是 WSA_IO_PENDING 和读取方法 returns.
当客户端随后发送数据时,线程仍然停留在 GetQueuedCompletionStatus 调用中(因为我假设我需要另一个 WSARecv 调用?)。
我应该一直循环读取方法直到数据到达吗?这似乎不合逻辑,我认为通过发出初始读取 GetQueuedCompletionStatus 将在数据到达时完成。
2) 我需要在没有确认的情况下双向读写数据。因此,我还创建了一个带有 IOCP 线程的客户端。是否真的可以使用完成端口来执行此操作,或者是否必须在读取之后进行写入?
抱歉,我问的是基本问题,但在搜索互联网并构建 IOCP 示例后,我仍然无法回答这些问题。
非常感谢。
On client connection (accept), I issue a read. As the client hasn't sent any data yet, WSAGetLastError() is WSA_IO_PENDING and the read method returns.
这是正常行为。
When the client then sends data, the thread remains stuck in the GetQueuedCompletionStatus call (as I assume I need another WSARecv call?).
不,您不需要再打电话。如果它卡住了,那么你就没有正确地将读取与 I/O 完成端口相关联。
Am I supposed to keep looping the read method until data arrives?
没有。您需要调用 WSARecv()
一次以进行初始读取。 WSA_IO_PENDING
错误意味着读取正在等待数据,并且会在数据实际到达时向 I/O 完成端口发出信号。在该信号实际到达之前,请勿调用 WSARecv()
(或任何其他读取函数)。然后您可以再次调用 WSARecv()
以等待更多数据。重复直到套接字断开连接。
I thought by issuing the initial read GetQueuedCompletionStatus would complete when data arrived.
这正是应该发生的事情。
2) I need to read and write data bi-directional without acknowledgements. Therefore I've also created a client with the IOCP thread. Is it actually possible to do this with completion ports
是的。读写是分开的操作,互不依赖
does a read have to be followed by a write?
如果您的协议不需要,则不需要。
话虽如此,您的代码存在一些问题。
需要注意的是,WSAAccept()
是同步的,您应该考虑使用 AcceptEx()
,这样它就可以使用相同的 I/O 完成端口来报告新连接。
但更重要的是,当一个挂起的I/O操作失败时,GetQueuedCompletionStatus()
returns FALSE,返回的LPOVERLAPPED
指针将为非 NULL,并且 GetLastError()
将报告 I/O 操作失败的原因。但是,如果 GetQueuedCompletionStatus()
本身 失败,则返回的 LPOVERLAPPED
指针将为 NULL,并且 GetLastError()
将报告 GetQueuedCompletionStatus()
失败的原因。 documentation 中清楚地解释了这种差异,但您的 while
循环并未考虑到这一点。使用 do..while
循环代替并根据 LPOVERLAPPED
指针:
DWORD WINAPI completionPortThreadProc( LPVOID param )
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
do
{
if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
{
// I/O success, handle perIoData based on completionKey as needed...
}
else if( perIoData )
{
// I/O failed, handle perIoData based on completionKey as needed...
}
else
{
// GetQueuedCompletionStatus() failure...
break;
}
}
while( WaitForSingleObject( exitEvent, 0 ) == WAIT_TIMEOUT );
return 0;
}
附带说明一下,考虑使用 PostQueuedCompletionionStatus()
而不是 post [=68= 的终止完成键,而不是使用事件对象来发出 completionPortThreadProc()
应该退出的信号] Completion Port,然后你的循环可以寻找那个值:
DWORD WINAPI completionPortThreadProc( LPVOID param )
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
do
{
if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
{
if( completionKey == MyTerminateKey )
break;
if( completionKey == MySocketIOKey )
{
// I/O success, handle perIoData as needed...
}
}
else if( perIoData )
{
// I/O failed, handle perIoData based on completionKey as needed...
}
else
{
// GetQueuedCompletionStatus() failure...
break;
}
}
while( true );
return 0;
}
CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, MySocketIOKey, 0 );
PostQueuedCompletionStatus( completionPort, 0, MyTerminateKey, NULL );
我有以下简化的 IO 完成端口服务器 C++ 代码:
int main(..)
{
startCompletionPortThreadProc();
// Await client connection
sockaddr_in clientAddress;
int clientAddressSize = sizeof( clientAddress );
SOCKET acceptSocket = WSAAccept( serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL);
// Connected
CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, 0, 0 );
// Issue initial read
read( acceptSocket );
}
DWORD WINAPI completionPortThreadProc( LPVOID param )
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
while( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
{
if( WaitForSingleObject( exitEvent, 0 ) == WAIT_OBJECT_0 )
{
break;
}
if( !perIoData )
continue;
if( bytesTransferred == 0 )
{
//TODO
}
switch( perIoData->operation )
{
case OPERATION_READ:
{
// Bytes have been received
if( bytesTransferred < perIoData->WSABuf.len )
{
// Terminate string
perIoData->WSABuf.buf[bytesTransferred] = '[=10=]';
perIoData->WSABuf.buf[bytesTransferred+1] = '[=10=]';
}
// Add data to message build
message += std::tstring( (TCHAR*)perIoData->WSABuf.buf );
// Perform next read
perIoData->WSABuf.len = sizeof( perIoData->inOutBuffer );
perIoData->flags = 0;
if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == 0 )
{
// Part message
continue;
}
if( WSAGetLastError() == WSA_IO_PENDING )
{
// End of message
//TODO: Process message here
continue;
}
}
}
break;
case OPERATION_WRITE:
{
perIoData->bytesSent += bytesTransferred;
if( perIoData->bytesSent < perIoData->bytesToSend )
{
perIoData->WSABuf.buf = (char*)&( perIoData->inOutBuffer[perIoData->bytesSent] );
perIoData->WSABuf.len = ( perIoData->bytesToSend - perIoData->bytesSent);
}
else
{
perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer;
perIoData->WSABuf.len = _tcslen( perIoData->inOutBuffer ) * sizeof( TCHAR );
perIoData->bytesSent = 0;
perIoData->bytesToSend = perIoData->WSABuf.len;
}
if( perIoData->bytesToSend )
{
if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, 0, &( perIoData->overlapped ), NULL ) == 0 )
continue;
if( WSAGetLastError() == WSA_IO_PENDING )
continue;
}
}
break;
}
}
return 0;
}
bool SocketServer::read( SOCKET socket, HANDLE completionPort )
{
PER_IO_DATA* perIoData = new PER_IO_DATA;
ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );
perIoData->socket = socket;
perIoData->operation = OPERATION_READ;
perIoData->WSABuf.buf = (char*)perIoData->inOutBuffer;
perIoData->WSABuf.len = sizeof( perIoData->inOutBuffer );
perIoData->overlapped.hEvent = WSACreateEvent();
DWORD bytesReceived = 0;
if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesReceived, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
{
int gle = WSAGetLastError();
if( WSAGetLastError() != WSA_IO_PENDING )
{
delete perIoData;
return false;
}
}
return true;
}
bool SocketServer::write( SOCKET socket, std::tstring& data )
{
PER_IO_DATA* perIoData = new PER_IO_DATA;
ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );
perIoData->socket = socket;
perIoData->operation = OPERATION_WRITE;
perIoData->WSABuf.buf = (char*)data.c_str();
perIoData->WSABuf.len = _tcslen( data.c_str() ) * sizeof( TCHAR );
perIoData->bytesToSend = perIoData->WSABuf.len;
perIoData->overlapped.hEvent = WSACreateEvent();
DWORD bytesSent = 0;
if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesSent, 0, &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
{
if( WSAGetLastError() != WSA_IO_PENDING )
{
delete perIoData;
return false;
}
}
return true;
}
1) 我遇到的第一个问题是初读。
在客户端连接(接受)时,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError() 是 WSA_IO_PENDING 和读取方法 returns.
当客户端随后发送数据时,线程仍然停留在 GetQueuedCompletionStatus 调用中(因为我假设我需要另一个 WSARecv 调用?)。
我应该一直循环读取方法直到数据到达吗?这似乎不合逻辑,我认为通过发出初始读取 GetQueuedCompletionStatus 将在数据到达时完成。
2) 我需要在没有确认的情况下双向读写数据。因此,我还创建了一个带有 IOCP 线程的客户端。是否真的可以使用完成端口来执行此操作,或者是否必须在读取之后进行写入?
抱歉,我问的是基本问题,但在搜索互联网并构建 IOCP 示例后,我仍然无法回答这些问题。
非常感谢。
On client connection (accept), I issue a read. As the client hasn't sent any data yet, WSAGetLastError() is WSA_IO_PENDING and the read method returns.
这是正常行为。
When the client then sends data, the thread remains stuck in the GetQueuedCompletionStatus call (as I assume I need another WSARecv call?).
不,您不需要再打电话。如果它卡住了,那么你就没有正确地将读取与 I/O 完成端口相关联。
Am I supposed to keep looping the read method until data arrives?
没有。您需要调用 WSARecv()
一次以进行初始读取。 WSA_IO_PENDING
错误意味着读取正在等待数据,并且会在数据实际到达时向 I/O 完成端口发出信号。在该信号实际到达之前,请勿调用 WSARecv()
(或任何其他读取函数)。然后您可以再次调用 WSARecv()
以等待更多数据。重复直到套接字断开连接。
I thought by issuing the initial read GetQueuedCompletionStatus would complete when data arrived.
这正是应该发生的事情。
2) I need to read and write data bi-directional without acknowledgements. Therefore I've also created a client with the IOCP thread. Is it actually possible to do this with completion ports
是的。读写是分开的操作,互不依赖
does a read have to be followed by a write?
如果您的协议不需要,则不需要。
话虽如此,您的代码存在一些问题。
需要注意的是,WSAAccept()
是同步的,您应该考虑使用 AcceptEx()
,这样它就可以使用相同的 I/O 完成端口来报告新连接。
但更重要的是,当一个挂起的I/O操作失败时,GetQueuedCompletionStatus()
returns FALSE,返回的LPOVERLAPPED
指针将为非 NULL,并且 GetLastError()
将报告 I/O 操作失败的原因。但是,如果 GetQueuedCompletionStatus()
本身 失败,则返回的 LPOVERLAPPED
指针将为 NULL,并且 GetLastError()
将报告 GetQueuedCompletionStatus()
失败的原因。 documentation 中清楚地解释了这种差异,但您的 while
循环并未考虑到这一点。使用 do..while
循环代替并根据 LPOVERLAPPED
指针:
DWORD WINAPI completionPortThreadProc( LPVOID param )
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
do
{
if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
{
// I/O success, handle perIoData based on completionKey as needed...
}
else if( perIoData )
{
// I/O failed, handle perIoData based on completionKey as needed...
}
else
{
// GetQueuedCompletionStatus() failure...
break;
}
}
while( WaitForSingleObject( exitEvent, 0 ) == WAIT_TIMEOUT );
return 0;
}
附带说明一下,考虑使用 PostQueuedCompletionionStatus()
而不是 post [=68= 的终止完成键,而不是使用事件对象来发出 completionPortThreadProc()
应该退出的信号] Completion Port,然后你的循环可以寻找那个值:
DWORD WINAPI completionPortThreadProc( LPVOID param )
{
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = NULL;
LPPER_IO_DATA perIoData = NULL;
do
{
if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
{
if( completionKey == MyTerminateKey )
break;
if( completionKey == MySocketIOKey )
{
// I/O success, handle perIoData as needed...
}
}
else if( perIoData )
{
// I/O failed, handle perIoData based on completionKey as needed...
}
else
{
// GetQueuedCompletionStatus() failure...
break;
}
}
while( true );
return 0;
}
CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, MySocketIOKey, 0 );
PostQueuedCompletionStatus( completionPort, 0, MyTerminateKey, NULL );