gpt4 book ai didi

c++ - IOCP 的内存使用

转载 作者:行者123 更新时间:2023-11-28 05:05:47 27 4
gpt4 key购买 nike

<分区>

我正在将我们的代码转换为使用 IOCP,并且通信相对稳定,但应用程序的内存使用量在增加。看起来我(在完成函数调用时)返回的 OverlappedEx 对象比我创建的要少得多。我的代码如下。我做错了什么?

#ifndef NETWORK_DATA
#define NETWORK_DATA

#include <afxwin.h>
#include <vector>
#include <string>
#include "CriticalSectionLocker.h"

using namespace std;

DWORD NetworkManager::NetworkThread(void* param)
{
bool bRun = true;


while (bRun)
{
DWORD wait = ::WaitForSingleObject(CCommunicationManager::s_hShutdownEvent, 0);
if (WAIT_OBJECT_0 == wait)
{
bRun = false;
DEBUG_LOG0("Shutdown event was signalled thread");
}
else
{
DWORD dwBytesTransfered = 0;
void* lpContext = nullptr;
OVERLAPPED* pOverlapped = nullptr;

BOOL bReturn = GetQueuedCompletionStatus(s_IOCompletionPort,
&dwBytesTransfered,
(LPDWORD)&lpContext,
&pOverlapped,
INFINITE);
if (nullptr == lpContext)
{
DEBUG_LOG0("invalid context");
/*continue;*/
}
else
{
if (bReturn && dwBytesTransfered > 0)
{
OverlappedEx* data = reinterpret_cast<OverlappedEx*>(pOverlapped);
ServerData* networkData = reinterpret_cast<ServerData*>(lpContext);

if (networkData && data)
{
switch(data->m_opType)
{
case OverlappedEx::OP_READ:
/*DEBUG_LOG4("device name: %s bytes received: %d socket: %d handle: %d",
networkData->Name().c_str(), dwBytesTransfered, networkData->Socket(), networkData->Handle());*/
networkData->CompleteReceive(dwBytesTransfered, data);
break;
case OverlappedEx::OP_WRITE:
/*DEBUG_LOG4("device name: %s bytes sent: %d socket: %d handle: %d",
networkData->Name().c_str(), dwBytesTransfered, networkData->Socket(), networkData->Handle());*/
networkData->CompleteSend(dwBytesTransfered, data);
break;
}
}
}
else
{
/*DEBUG_LOG2("GetQueuedCompletionStatus failed: bReturn: %d dwBytesTransferred: %u", bReturn, dwBytesTransfered);*/
}
}
}
}
return 0;
}


enum NetworkType
{
UDP,
TCP
};

struct OverlappedEx : public OVERLAPPED
{
enum OperationType
{
OP_READ,
OP_WRITE
};

const static int MAX_PACKET_SIZE = 2048;
WSABUF m_wBuf;
char m_buffer[MAX_PACKET_SIZE];
OperationType m_opType;

OverlappedEx()
{
Clear();
m_refCount = 1;
}

void AddRef()
{
::InterlockedIncrement(&m_refCount);
}

void Release()
{
::InterlockedDecrement(&m_refCount);
}

int Refcount() const
{
return InterlockedExchangeAdd((unsigned long*)&m_refCount, 0UL);
}

~OverlappedEx()
{
Clear();
}

void Clear()
{
memset(m_buffer, 0, MAX_PACKET_SIZE);
m_wBuf.buf = m_buffer;
m_wBuf.len = MAX_PACKET_SIZE;
Internal = 0;
InternalHigh = 0;
Offset = 0;
OffsetHigh = 0;
hEvent = nullptr;
m_opType = OP_READ;
}

private:
volatile LONG m_refCount;
};


class ServerData
{
public:
const static int MAX_REVEIVE_QUEUE_SIZE = 100;
const static int MAX_PACKET_SIZE = 2048;
const static int MAX_SEND_QUEUE_SIZE = 10;
const static int MAX_RECEIVE_QUEUE_SIZE = 100;
const static int MAX_OVERLAPPED_STRUCTS = 20;


ServerData(NetworkType netType, const string& sName, CCommunicationManager::CommHandle handle,
SOCKET sock, HANDLE IOPort) :
m_sName(sName)
{
InitializeCriticalSection(&m_receiveQueLock);
InitializeCriticalSection(&m_objectLock);
m_Handle = handle;
m_Socket = sock;
m_nIPAddress = 0;
m_netType = netType;
m_bEnabled = true;
m_ovlpIndex = 0;

for (int i = 0; i < MAX_OVERLAPPED_STRUCTS; ++i)
{
m_olps.push_back(new OverlappedEx);
}

/* Associate socket with completion handle */
if (m_Socket != 0)
{
CreateIoCompletionPort( reinterpret_cast<HANDLE>(m_Socket), IOPort, reinterpret_cast<ULONG_PTR>(this), 0 );
}
}

~ServerData()
{
CriticalSectionLocker lock(&m_receiveQueLock);
DeleteCriticalSection(&m_receiveQueLock);

DeleteCriticalSection(&m_objectLock);
closesocket(m_Socket);
}

const string& Name() const { return m_sName; }
bool Enabled() const { return m_bEnabled; }
void SetEnabled(bool bEnabled)
{
m_bEnabled = bEnabled;
}

int Handle() const { return m_Handle; }
void SetHandle(int handle)
{
m_Handle = handle;
}

unsigned long IPAddress() const { return m_nIPAddress; }

SOCKET Socket() const
{
return m_Socket;
}

void SetSocket(SOCKET sock)
{
m_Socket = sock;
}

void SetIPAddress(unsigned long nIP)
{
m_nIPAddress = nIP;
}

bool ValidTelegram(const vector<char>& telegram) const
{
return false;
}

OverlappedEx* GetBuffer()
{
OverlappedEx* ret = nullptr;

if (!m_olps.empty())
{
ret = m_olps.front();
m_olps.pop_front();
}

return ret;
}


void CompleteReceive(size_t numBytes, OverlappedEx* data)
{
//DEBUG_LOG1("%d buffers are available", AvailableBufferCount());

if (numBytes > 0)
{
vector<char> v(data->m_buffer, data->m_buffer + numBytes);
ReceivedData rd;
rd.SetData(v);
EnqueReceiveMessage(rd);
}

data->Release();
{
CriticalSectionLocker lock(&m_objectLock);
m_olps.push_back(data);
// DEBUG_LOG1("Queue size: %d", m_olps.size());
}

StartReceiving();

}

void CompleteSend(size_t numBytes, OverlappedEx* data)
{

data->Release();
{
CriticalSectionLocker lock(&m_objectLock);
m_olps.push_back(data);
//DEBUG_LOG1("Queue size: %d", m_olps.size());
}

//DEBUG_LOG2("Object: %s num sent: %d", Name().c_str(), numBytes);
}

void StartReceiving()
{
DWORD bytesRecv = 0;
sockaddr_in senderAddr;
DWORD flags = 0;
int senderAddrSize = sizeof(senderAddr);
int rc = 0;

CriticalSectionLocker lock(&m_objectLock);
auto olp = GetBuffer();
if (!olp)
{
if (...)
{
m_olps.push_back(new OverlappedEx);
olp = GetBuffer();
}
else
{
if (...)
{
DEBUG_LOG1("Name: %s ************* NO AVAILABLE BUFFERS - bailing ***************", Name().c_str());
}
return;
}
}

olp->Clear();
olp->m_opType = OverlappedEx::OP_READ;
olp->AddRef();

switch(GetNetworkType())
{
case UDP:
{
rc = WSARecvFrom(Socket(),
&olp->m_wBuf,
1,
&bytesRecv,
&flags,
(SOCKADDR *)&senderAddr,
&senderAddrSize, (OVERLAPPED*)olp, NULL);
}
break;
case TCP:
{
rc = WSARecv(Socket(),
&olp->m_wBuf,
1,
&bytesRecv,
&flags,
(OVERLAPPED*)olp, NULL);
}
break;
}

if (SOCKET_ERROR == rc)
{
DWORD err = WSAGetLastError();
if (err != WSA_IO_PENDING)
{
olp->Release();
m_olps.push_back(olp);
}
}
}


void SetWriteBuf(const SendData& msg, OverlappedEx* data)
{
int len = min(msg.Data().size(), MAX_PACKET_SIZE);
memcpy(data->m_buffer, &msg.Data()[0], len);
data->m_wBuf.buf = data->m_buffer;
data->m_wBuf.len = len;
}

void StartSending(const SendData& msg)
{

DEBUG_LOG1("device name: %s", Name().c_str());

int rc = 0;
DWORD bytesSent = 0;
DWORD flags = 0;
SOCKET sock = Socket();

int addrSize = sizeof(sockaddr_in);

CriticalSectionLocker lock(&m_objectLock);
//UpdateOverlapped(OverlappedEx::OP_WRITE);

auto olp = GetBuffer();
if (!olp)
{
if (...)
{
m_olps.push_back(new OverlappedEx);
olp = GetBuffer();
DEBUG_LOG2("name: %s ************* NO AVAILABLE BUFFERS new size: %d ***************", Name().c_str(), m_olps.size());
}
else
{
if (...)
{
DEBUG_LOG1("Name: %s ************* NO AVAILABLE BUFFERS - bailing ***************", Name().c_str());
}
return;
}
}

olp->Clear();
olp->m_opType = OverlappedEx::OP_WRITE;
olp->AddRef();
SetWriteBuf(msg, olp);

switch(GetNetworkType())
{
case UDP:

rc = WSASendTo(Socket(), &olp->m_wBuf, 1,
&bytesSent, flags, (sockaddr*)&msg.SendAddress(),
addrSize, (OVERLAPPED*)olp, NULL);
break;
case TCP:

rc = WSASend(Socket(), &olp->m_wBuf, 1,
&bytesSent, flags, (OVERLAPPED*)olp, NULL);
break;
}

if (SOCKET_ERROR == rc)
{
DWORD err = WSAGetLastError();
if (err != WSA_IO_PENDING)
{
olp->Release();
m_olps.push_back(olp);
}
}
}

size_t ReceiveQueueSize()
{
CriticalSectionLocker lock(&m_receiveQueLock);
return m_receiveDataQueue.size();
}

void GetAllData(vector <ReceivedData> & data)
{
CriticalSectionLocker lock(&m_receiveQueLock);
while (m_receiveDataQueue.size() > 0)
{
data.push_back(m_receiveDataQueue.front());
m_receiveDataQueue.pop_front();
}
}

void DequeReceiveMessage(ReceivedData& msg)
{
CriticalSectionLocker lock(&m_receiveQueLock);
if (m_receiveDataQueue.size() > 0)
{
msg = m_receiveDataQueue.front();
m_receiveDataQueue.pop_front();
}
}

template <class T>
void EnqueReceiveMessage(T&& data)
{
CriticalSectionLocker lock(&m_receiveQueLock);
if (m_receiveDataQueue.size() <= MAX_RECEIVE_QUEUE_SIZE)
{
m_receiveDataQueue.push_back(data);
}
else
{
static int s_nLogCount = 0;
if (s_nLogCount % 100 == 0)
{
DEBUG_LOG2("Max queue size was reached handle id: %d in %s", Handle(), Name().c_str());
}
s_nLogCount++;
}
}

NetworkType GetNetworkType() const
{
return m_netType;
}

private:
ServerData(const ServerData&);
ServerData& operator=(const ServerData&);
private:
bool m_bEnabled; //!< This member flags if this reciever is enabled for receiving incoming connections.
int m_Handle; //!< This member holds the handle for this receiver.
SOCKET m_Socket; //!< This member holds the socket information for this receiver.
unsigned long m_nIPAddress; //!< This member holds an IP address the socket is bound to.
deque < ReceivedData > m_receiveDataQueue;
CRITICAL_SECTION m_receiveQueLock;
CRITICAL_SECTION m_objectLock;
string m_sName;
NetworkType m_netType;
deque<OverlappedEx*> m_olps;
size_t m_ovlpIndex;
};


#endif

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com