gpt4 book ai didi

c++ - 如何使用 POSIX 线程实现阻塞读取

转载 作者:塔克拉玛干 更新时间:2023-11-03 00:01:07 28 4
gpt4 key购买 nike

我想实现一个遵循大致如下接口(interface)的生产者/消费者场景:

class Consumer {
private:
vector<char> read(size_t n) {
// If the internal buffer has `n` elements, then dequeue them
// Otherwise wait for more data and try again
}
public:
void run() {
read(10);
read(4839);
// etc
}
void feed(const vector<char> &more) {
// Safely queue the data
// Notify `read` that there is now more data
}
};

在这种情况下,feedrun 将在不同的线程上运行,而 read 应该是阻塞读取(如 recvfread)。显然,我的双端队列需要某种互斥,并且我需要某种通知系统来通知 read 重试。

我听说条件变量是可行的方法,但我所有的多线程经验都来自于 Windows,我很难全神贯注于它们。

感谢您的帮助!

(是的,我知道返回 vector 效率低下。我们不谈这个。)

最佳答案

此代码尚未准备好投入生产。不会对任何库调用的结果进行错误检查。

我在 LockThread 中封装了互斥锁的锁定/解锁,因此它是异常安全的。但仅此而已。

此外,如果我认真地这样做,我会将互斥量和条件变量包装在对象中,这样它们就不会在 Consumer 的其他方法中被滥用。但是只要您注意到必须在使用条件变量(以任何方式)之前获取锁,那么这个简单的情况就可以保持原样。

出于兴趣,您是否检查过 boost 线程库?

#include <iostream>
#include <vector>
#include <pthread.h>

class LockThread
{
public:
LockThread(pthread_mutex_t& m)
:mutex(m)
{
pthread_mutex_lock(&mutex);
}
~LockThread()
{
pthread_mutex_unlock(&mutex);
}
private:
pthread_mutex_t& mutex;
};
class Consumer
{
pthread_mutex_t lock;
pthread_cond_t cond;
std::vector<char> unreadData;
public:
Consumer()
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
}
~Consumer()
{
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&lock);
}

private:
std::vector<char> read(size_t n)
{
LockThread locker(lock);
while (unreadData.size() < n)
{
// Must wait until we have n char.
// This is a while loop because feed may not put enough in.

// pthread_cond() releases the lock.
// Thread will not be allowed to continue until
// signal is called and this thread reacquires the lock.

pthread_cond_wait(&cond,&lock);

// Once released from the condition you will have re-aquired the lock.
// Thus feed() must have exited and released the lock first.
}

/*
* Not sure if this is exactly what you wanted.
* But the data is copied out of the thread safe buffer
* into something that can be returned.
*/
std::vector<char> result(n); // init result with size n
std::copy(&unreadData[0],
&unreadData[n],
&result[0]);

unreadData.erase(unreadData.begin(),
unreadData.begin() + n);
return (result);
}
public:
void run()
{
read(10);
read(4839);
// etc
}
void feed(const std::vector<char> &more)
{
LockThread locker(lock);

// Once we acquire the lock we can safely modify the buffer.
std::copy(more.begin(),more.end(),std::back_inserter(unreadData));

// Only signal the thread if you have the lock
// Otherwise race conditions happen.
pthread_cond_signal(&cond);

// destructor releases the lock and thus allows read thread to continue.
}
};


int main()
{
Consumer c;
}

关于c++ - 如何使用 POSIX 线程实现阻塞读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/206857/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com