gpt4 book ai didi

c++ - 尝试使用 Win32 线程进行异步 I/O

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:20:53 24 4
gpt4 key购买 nike

我正在为 Windows 编写串口软件。为了提高性能,我试图将例程转换为使用异步 I/O。我已经编写了代码并且运行得相当好,但我是这方面的半初学者,我想进一步提高程序的性能。在程序的压力测试期间(即以高波特率尽可能快地将数据传入/传出端口),CPU 负载变得相当高。

如果有人在 Windows 中使用过异步 I/O 和多线程,请看一下我的程序,我将不胜感激。我主要担心两个问题:

  • 异步 ​​I/O 是否正确实现?我在网上找到了一些相当可靠的来源,建议您可以将用户数据传递给回调函数,方法是在最后用您自己的数据实现您自己的 OVERLAPPED 结构。这似乎工作得很好,但对我来说确实有点“hackish”。此外,当我从同步/轮询转换为异步/回调时,程序的性能并没有太大提高,这让我怀疑我做错了什么。

  • 为 FIFO 数据缓冲区使用 STL std::deque 是否合理?由于当前编写的程序,在必须处理之前,我一次只允许接收 1 个字节的数据。因为我不知道我会收到多少数据,它可能是无穷无尽的。我假设这个 1-byte-at-time 在必须分配数据时会在 deque 行后面产生缓慢的行为。而且我也不相信双端队列是线程安全的(我应该吗?)。如果使用 STL 双端队列不合理,是否有任何建议可以使用更好的数据类型?基于静态数组的环形缓冲区?

也非常欢迎对代码的任何其他反馈。


串行例程的实现使我有一个名为“Comport”的父类,它处理所有与串行 I/O 相关的事情。我从这个类继承了另一个名为“ThreadedComport”的类,它是一个多线程版本。

ThreadedCompport 类(它的相关部分)

class ThreadedComport : public Comport
{
private:

HANDLE _hthread_port; /* thread handle */
HANDLE _hmutex_port; /* COM port access */
HANDLE _hmutex_send; /* send buffer access */
HANDLE _hmutex_rec; /* rec buffer access */

deque<uint8> _send_buf;
deque<uint8> _rec_buf;
uint16 _data_sent;
uint16 _data_received;

HANDLE _hevent_kill_thread;
HANDLE _hevent_open;
HANDLE _hevent_close;
HANDLE _hevent_write_done;
HANDLE _hevent_read_done;
HANDLE _hevent_ext_send; /* notifies external thread */
HANDLE _hevent_ext_receive; /* notifies external thread */

typedef struct
{
OVERLAPPED overlapped;
ThreadedComport* caller; /* add user data to struct */
} OVERLAPPED_overlap;

OVERLAPPED_overlap _send_overlapped;
OVERLAPPED_overlap _rec_overlapped;
uint8* _write_data;
uint8 _read_data;
DWORD _bytes_read;

static DWORD WINAPI _tranceiver_thread (LPVOID param);
void _send_data (void);
void _receive_data (void);
DWORD _wait_for_io (void);

static void WINAPI _send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);
static void WINAPI _receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);

};

通过CreateThread()创建的主线程例程:

DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
ThreadedComport* caller = (ThreadedComport*) param;

HANDLE handle_array [3] =
{
caller->_hevent_kill_thread, /* WAIT_OBJECT_0 */
caller->_hevent_open, /* WAIT_OBJECT_1 */
caller->_hevent_close /* WAIT_OBJECT_2 */
};

DWORD result;

do
{
/* wait for anything to happen */
result = WaitForMultipleObjects(3,
handle_array,
false, /* dont wait for all */
INFINITE);

if(result == WAIT_OBJECT_1 ) /* open? */
{
do /* while port is open, work */
{
caller->_send_data();
caller->_receive_data();
result = caller->_wait_for_io(); /* will wait for the same 3 as in handle_array above,
plus all read/write specific events */

} while (result != WAIT_OBJECT_0 && /* while not kill thread */
result != WAIT_OBJECT_2); /* while not close port */
}
else if(result == WAIT_OBJECT_2) /* close? */
{
; /* do nothing */
}

} while (result != WAIT_OBJECT_0); /* kill thread? */

return 0;
}

依次调用以下三个函数:

void ThreadedComport::_send_data (void)
{
uint32 send_buf_size;

if(_send_buf.size() != 0) // anything to send?
{
WaitForSingleObject(_hmutex_port, INFINITE);
if(_is_open) // double-check port
{
bool result;

WaitForSingleObject(_hmutex_send, INFINITE);
_data_sent = 0;
send_buf_size = _send_buf.size();
if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
{
send_buf_size = _MAX_MESSAGE_LENGTH;
}
_write_data = new uint8 [send_buf_size];


for(uint32 i=0; i<send_buf_size; i++)
{
_write_data[i] = _send_buf.front();
_send_buf.pop_front();
}
_send_buf.clear();
ReleaseMutex(_hmutex_send);


result = WriteFileEx (_hcom, // handle to output file
(void*)_write_data, // pointer to input buffer
send_buf_size, // number of bytes to write
(LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);

SleepEx(INFINITE, true); // Allow callback to come

if(result == false)
{
// error handling here
}

} // if(_is_open)
ReleaseMutex(_hmutex_port);
}
else /* nothing to send */
{
SetEvent(_hevent_write_done); // Skip write
}
}


void ThreadedComport::_receive_data (void)
{
WaitForSingleObject(_hmutex_port, INFINITE);

if(_is_open)
{
BOOL result;

_bytes_read = 0;
result = ReadFileEx (_hcom, // handle to output file
(void*)&_read_data, // pointer to input buffer
1, // number of bytes to read
(OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);

SleepEx(INFINITE, true); // Allow callback to come

if(result == FALSE)
{
DWORD last_error = GetLastError();
if(last_error == ERROR_OPERATION_ABORTED) // disconnected ?
{
close(); // close the port
}
}
}

ReleaseMutex(_hmutex_port);
}



DWORD ThreadedComport::_wait_for_io (void)
{
DWORD result;
bool is_write_done = false;
bool is_read_done = false;

HANDLE handle_array [5] =
{
_hevent_kill_thread,
_hevent_open,
_hevent_close,
_hevent_write_done,
_hevent_read_done
};


do /* COM port message pump running until sending / receiving is done */
{
result = WaitForMultipleObjects(5,
handle_array,
false, /* dont wait for all */
INFINITE);

if(result <= WAIT_OBJECT_2)
{
break; /* abort */
}
else if(result == WAIT_OBJECT_3) /* write done */
{
is_write_done = true;
SetEvent(_hevent_ext_send);
}
else if(result == WAIT_OBJECT_4) /* read done */
{
is_read_done = true;

if(_bytes_read > 0)
{
uint32 errors = 0;

WaitForSingleObject(_hmutex_rec, INFINITE);
_rec_buf.push_back((uint8)_read_data);
_data_received += _bytes_read;

while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
{
_rec_buf.pop_front();
}

ReleaseMutex(_hmutex_rec);
_bytes_read = 0;

ClearCommError(_hcom, &errors, NULL);
SetEvent(_hevent_ext_receive);
}
}
} while(!is_write_done || !is_read_done);

return result;
}

异步 I/O 回调函数:

void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;

if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
_this->_data_sent = dwNumberOfBytesTransfered;
}
}


delete [] _this->_write_data; /* always clean this up */
SetEvent(lpOverlapped->hEvent);
}


void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
_this->_bytes_read = dwNumberOfBytesTransfered;
}
}

SetEvent(lpOverlapped->hEvent);
}

最佳答案

第一个问题很简单。该方法并不骇人听闻;你拥有 OVERLAPPED内存和它之后的一切。 Raymond Chen 最好地描述了这一点:http://blogs.msdn.com/b/oldnewthing/archive/2010/12/17/10106259.aspx

只有当您在等待 I/O 完成时有更好的事情要做时,您才会期望性能得到提高。如果你所做的只是SleepEx ,您只会看到 CPU% 下降。线索就在名称“重叠”中——它允许您重叠计算和 I/O。

std::deque<unsigned char>可以毫无大问题地处理 FIFO 数据。它可能会回收 4KB 的 block (精确的数字由广泛的分析确定,全部为您完成)。

[编辑]我进一步研究了您的代码,似乎代码不必要地复杂。对于初学者来说,异步 I/O 的主要好处之一是您不需要所有线程的东西。线程允许您使用更多的核心,但您要处理一个慢速的 I/O 设备。即使是单个内核也足够了,如果它不会一直等待。而这正是重叠 I/O 的用途。您只需将一个线程专用于该端口的所有 I/O 工作。因为它是唯一的线程,所以它不需要互斥量来访问该端口。

OTOH,你会想要一个围绕 deque<uint8> 的互斥量对象,因为生产者/消费者线程与 comport 线程不同。

关于c++ - 尝试使用 Win32 线程进行异步 I/O,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5221278/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com