IOCP Server

  • Thread starter Thread starter Neptune123
  • Start date Start date
N

Neptune123

Guest
I am working on an TCP/IP IOCP server application. My server accepts client request and make a new socket connection for each client and associate it with IOCP and start receiving data. The number of clients are 10. I use single worker thread . After initializing worker-thread and IOCP, I use the worker thread proc and get the completed IO request using GetQueuedCompletionStatus. When I receive data at a fast rate from clients the data within the receive buffer(WSABUF) of a client sometimes seems to be corrupted . I make new connection and maintain a separate buffer for each client. Sometimes the received data buffer of a client contains other clients data. The first n bytes seems to be corrupted or contains other client data.The remaining data are good. The size of the WSABUF is 65600. Here is my server accept and worker thread process code. I am not sure, Why I get corrupted data and data from other clients. Will anybody help to solve this? Thank you for your hepl.


////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CCIOCPDataServer::OnAccept
//
// DESCRIPTION: Accepts incoming clients
//
////////////////////////////////////////////////////////////////////////////////
void CIOCPDataServer::OnAccept()
{
SOCKET clientSocket;
DWORD dwRecvNumBytes = 0;
DWORD dwFlags = 0;

int nRet;

//
// accept the new socket descriptor
//
clientSocket = WSAAccept(m_socListen, NULL, NULL, NULL,
NULL);
if (clientSocket == INVALID_SOCKET)
{
nRet = WSAGetLastError();
if (nRet != WSAEWOULDBLOCK)
{
return;
}
}

SocketConnection* s = new SocketConnection(clientSocket, (POnSocketDataReceived2)m_onDataReceived);

m_hCompletionPort = CreateIoCompletionPort((HANDLE)clientSocket, m_hCompletionPort, (ULONG_PTR)s, 0);

if (!m_hCompletionPort)
{
delete s;
s = NULL;
closesocket( clientSocket );
return;
}

if (m_onConnected != NULL)
m_onConnected(s);

int result = WSARecv(clientSocket, s->GetBuffer(), 1, &dwRecvNumBytes, &dwFlags, s->GetOverlapped(), NULL);

if (result == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) //Report an error here or do something...this one won't be working
{
RemoveFromDataSocketsList(s,TRUE);
return;
}
}


////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CCIOCPDataServer::WorkerThreadProc
//
//
////////////////////////////////////////////////////////////////////////////////
unsigned __stdcall CIOCPDataServer::WorkerThreadProc (LPVOID thisContext)
{

CIOCPDataServer* pThis = reinterpret_cast<CIOCPDataServer*>(thisContext);

// Loop round and round servicing I/O completions.
while (WaitForSingleObject(pThis->m_hKillIOEvent, 0) == WAIT_TIMEOUT)
{

DWORD bytesTransferred;
SocketConnection* client;
LPOVERLAPPED lo = NULL;

// Get a completed IO request.
BOOL bSuccess = GetQueuedCompletionStatus(
pThis->m_hCompletionPort,
&bytesTransferred,
(PULONG_PTR)&client,
&lo, INFINITE);

if (bSuccess)
{
if(bytesTransferred == 0)
{
//client connection dropped
if(client != NULL)
{
pThis->RemoveFromDataSocketsList( client, TRUE );
}
continue;
}


client->DataReceived(bytesTransferred);

//Inform the client
SOCKET so; //Read more data

client->GetSocket(&so);

DWORD dwRecvNumBytes = 0;

DWORD dwFlags = 0;

int result = WSARecv(so, client->GetBuffer(), 1, &dwRecvNumBytes, &dwFlags, client->GetOverlapped(), NULL);
if (result == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
//Report an error here or do something...this one won't be working
if(client != NULL)
{
pThis->log_status_info("WSARecv function failed due to socket error\n");
pThis->RemoveFromDataSocketsList( client, TRUE );
}

}
}

else if (lo != NULL)
{
//It got a packet but there was an error
//Do something about it here if desired...in this case, we'll just close the socket
if(client != NULL)
{
pThis->log_status_info("Received data with error\n");
pThis->RemoveFromDataSocketsList( client, TRUE );
}
}
else
{
//Failed to dequeue a packet...not a problem if it just timed out due to a lack of traffic
pThis->log_status_info("Failed to dequeue a packet\n");
}
}

return 0;
}


///////////////////////////////////////////////////////////////////////
//
// FUNCTION: SocketConnection::SocketConnection
//
////////////////////////////////////////////////////////////////////////////////

SocketConnection::SocketConnection(SOCKET socket, POnSocketDataReceived2 OnReceived)
{
m_Socket = socket;

m_onReceived = OnReceived;

//Create buffer

ZeroMemory(&m_buffer, sizeof(m_buffer));

m_buffer.buf = new char[MAX_BUFFER_LEN];

m_buffer.len = m_buffer.buf ? MAX_BUFFER_LEN : 0;

//Create overlapped structure

ZeroMemory(&m_ol, sizeof(m_ol));
}

///////////////////////////////////////////////////////////////////////
//
// FUNCTION: SocketConnection::~SocketConnection()
//
///////////////////////////////////////////////////////////////////////

SocketConnection::~SocketConnection()
{
if (m_Socket != NULL)
{
closesocket(m_Socket);
m_Socket = NULL;
}

if(m_buffer.buf != NULL)
{
delete[] m_buffer.buf;
m_buffer.buf = NULL;
}
}

///////////////////////////////////////////////////////////////////////
//
// FUNCTION: SocketConnection::DataReceived(DWORD bytes)
// Process received data
//
///////////////////////////////////////////////////////////////////////

void SocketConnection::DataReceived(DWORD bytes)
{
ProcessOneSet((const BYTE *)m_buffer.buf, bytes);
}


///////////////////////////////////////////////////////////////////////
//
// FUNCTION: ProcessOneSet(const BYTE * data, DWORD dataLength)
// Process one set of data
//
///////////////////////////////////////////////////////////////////////

int SocketConnection::ProcessOneSet(const BYTE *data, DWORD dataLength)
{
DWORD amountProcessed = 0; // running count of how many bytes of data have been processed
DWORD amountUsed = 0; // how many bytes of buffer used by start() or add()
const BYTE *pData = data;

do
{
if ( m_bufferedData.isNoMsg() ) // starting new message
amountUsed = m_bufferedData.start(pData, dataLength-amountProcessed);
else if ( m_bufferedData.isMsgIncomplete() ) // continuing to fill message
amountUsed = m_bufferedData.addData(pData, dataLength-amountProcessed);
else
return(0); // called with a full message. SHOULD NOT DO!

amountProcessed += amountUsed;
pData = data + amountProcessed; // skip over the data that was just used

if ( m_bufferedData.sizeValid() )
{
if ( m_bufferedData.isMsgDone() ) // if message is complete, process it
{
if ( m_onReceived != NULL ) // have a valid pointer to the on received data callback
{
SocketConnection* s = this;
m_onReceived((void**)&s, m_bufferedData.getMessage(), m_bufferedData.dataLength());
}
m_bufferedData.clearMessage(); // clean up message buffer, get ready for a new one
}
}
} while (amountProcessed < dataLength);

return(m_bufferedData.bytesNeeded()); // if not zero then we are leaving with a partially filled message buffer
}

Continue reading...
 

Similar threads

N
Replies
0
Views
101
Neptune123
N
P
Replies
0
Views
174
Policy standard local admin account with Active Di
P
Back
Top