gpt4 book ai didi

c++ - 条件变量的通知丢失

转载 作者:行者123 更新时间:2023-11-30 05:41:56 25 4
gpt4 key购买 nike

目前我正在使用 std::threads 编写某种 Fork/Join 模式。因此,我为 std::thread 编写了一个包装类,它为所有子级使用一个引用计数器。

每当子进程完成执行时,引用计数器就会减少,并向所有等待的线程发送通知。等待线程等待引用计数器变为 0,这意味着所有子线程都已完成执行。

不幸的是,似乎有时会错过通知。我已经使用 gdb 调试了程序,它告诉我最深的阻塞线程中的引用计数器实际上已经为 0,但它没有识别它。

该类称为 ThreadAttachment:

/**
* \brief For the \p ThreadScheduler the attachment object is a thread itself since for each task a single thread is created.
*
* Children management is required for the fork/join model. It is realized by using an atomic reference counter.
* The reference counter is initially set or changed dynamically by threadsafe operations.
* It is decreased automatically whenever a child task finishes its execution.
*/
class ThreadAttachment : public Attachment
{
public:
/**
* Creates a new thread attachment without creating the actual thread nor starting it.
* \param task The thread attachment is created for the corresponding task \p task.
*/
ThreadAttachment(Task *task);
virtual ~ThreadAttachment();

/**
* Sets the counter of the child tasks.
* \note Threadsafe.
*/
void setChildCount (int count);
/**
* Increments the counter of the child tasks by one.
* \note Threadsafe.
*/
void incrementChildCount();
/**
* Decrements the counter of the child tasks by one.
*
* Besides it notifies \ref m_childrenConditionVariable for all threads which means that all threads which are calling \ref joinChildren() are being awakened.
* \note Threadsafe.
*/
void decrementChildCount();
/**
* \return Returns the counter of the child tasks.
* \note Threadsafe.
*/
int childCount();
/**
* Joins all added children thread attachments.
* Waits for notifications of \ref m_childrenConditionVariable if the counter of child tasks is not already 0.
* Checks on each notification for the counter to become 0. If the counter is finally 0 it stops blocking and continues the execution.
*/
void joinChildren();

/**
* Allocates the actualy std::thread instance which also starts the thread immdiately.
* The thread executes the corresponding task safely when executed itself by the operating systems thread scheduler.
* \note This method should only be called once.
*/
void start();

/**
* Joins the previously with \ref start() allocated and started std::thread.
* If the std::thread is already done it continues immediately.
*/
void join();

/**
* Detaches the previously with \ref start() allocated and started std::thread.
* This releases the thread as well as any control.
*/
void detach();

private:
/**
* The thread is created in \ref start().
* It must be started after all attachment properties have been set properly.
*/
std::unique_ptr<std::thread> m_thread;
/**
* This mutex protects concurrent operations on \ref m_thread.
*/
std::mutex m_threadMutex;
/**
* A reference counter for all existing child threads.
* If this value is 0 the thread does not have any children.
*/
std::atomic_int m_childrenCounter;
/**
* This mutex is used for the condition variable \ref m_childrenConditionVariable when waiting for a notification.
*/
std::mutex m_childrenConditionVariableMutex;
/**
* This condition variable is used to signal this thread whenever one of his children finishes and its children counter is decreased.
* Using this variable it can wait in \ref join() for something to happen.
*/
std::condition_variable m_childrenConditionVariable;
};

方法 start() 启动线程:

void ThreadAttachment::start()
{
/*
* Use one single attachment object only once for one single task.
* Do not recycle it to prevent confusion.
*/
assert(this->m_thread.get() == nullptr);
ThreadAttachment *attachment = this;

/*
* Lock the mutex to avoid data races on writing the unique pointer of the thread which is not threadsafe itself.
* When the created thread runs it can write data to itself safely.
* It is okay to lock the mutex in the method start() since the creation of the thread does not block.
* It immediately returns to the method start() in the current thread.
*/
std::mutex &mutex = this->m_threadMutex;
{
std::lock_guard<std::mutex> lock(mutex);

/*
* The attachment should stay valid until the task itself is destroyed.
* So it can be passed safely.
*
* http://stackoverflow.com/a/7408135/1221159
*
* Since this call does not block and the thread's function is run concurrently the mutex will be unlocked and then the thread can acquire it.
*/
this->m_thread.reset(new std::thread([attachment, &mutex]()
{
/*
* Synchronize with the thread's creation.
* This lock will be acquired after the method start() finished creating the thread.
* It is used as simple barrier but should not be hold for any time.
* Otherwise potential deadlocks might occur if multiple locks are being hold especially in decreaseParentsChildrenCounter()
*/
{
std::lock_guard<std::mutex> lock(mutex);
}

attachment->correspondingTask()->executeSafely();

/*
* After spawning and joining in the task's logic there should be no more children left.
*/
assert(attachment->childCount() == 0);

/*
* Finally the children counter of the parent task has to be decreased.
* This has to be done by the scheduler since it is a critical area (access of the different attachments) and therefore must be locked.
*/
ThreadScheduler *scheduler = dynamic_cast<ThreadScheduler*>(attachment->correspondingTask()->scheduler());
assert(scheduler);
scheduler->decreaseParentsChildrenCounter(attachment);
}));
}
}

这是 ThreadScheduler 类的 decreaseParentsChildrenCounter() 方法:

void ThreadScheduler::decreaseParentsChildrenCounter(ThreadAttachment *attachment)
{
{
std::lock_guard<std::mutex> lock(this->m_mutex);

Task *child = attachment->correspondingTask();

assert(child != nullptr);

Task *parent = child->parent();

if (parent != nullptr)
{
Attachment *parentAttachment = this->attachment(parent);
assert(parentAttachment);
ThreadAttachment *parentThreadAttachment = dynamic_cast<ThreadAttachment*>(parentAttachment);
assert(parentThreadAttachment);
/*
* The parent's children counter must still be greater than 0 since this child is still missing.
*/
assert(parentThreadAttachment->childCount() > 0);
parentThreadAttachment->decrementChildCount();
}
}
}

它基本上是为父线程调用 decrementChildCount()。

方法 joinChildren() 等待所有 child 完成:

void ThreadAttachment::joinChildren()
{
/*
* Since the condition variable is notified each time the children counter is decremented
* it will always awake the wait call.
* Otherwise the predicate check will make sure that the parent thread continues work.
*/
std::unique_lock<std::mutex> l(this->m_childrenConditionVariableMutex);
this->m_childrenConditionVariable.wait(l,
[this]
{
/*
* When the children counter reached 0 no more children are executing and the parent can continue its work.
*/
return this->childCount() == 0;
}
);
}

这些是原子计数器操作,如您所见,每当值减少时我都会发送通知:

void ThreadAttachment::setChildCount(int counter)
{
this->m_childrenCounter = counter;
}

void ThreadAttachment::incrementChildCount()
{
this->m_childrenCounter++;
}

void ThreadAttachment::decrementChildCount()
{
this->m_childrenCounter--;

/*
* The counter should never be less than 0.
* Otherwise it has not been initialized properly.
*/
assert(this->childCount() >= 0);

/*
* Notify all thread which call joinChildren() which should usually only be its parent thread.
*/
this->m_childrenConditionVariable.notify_all();
}

int ThreadAttachment::childCount()
{
return this->m_childrenCounter.load();
}

作为测试用例,我使用 Fork/Join 模式递归地计算斐波那契数。我认为如果通知被错过,它应该检查谓词并检测子计数器为 0。显然值变为 0 那么怎么会错过呢?

最佳答案

更新影响条件的变量(在本例中为成员 count)在对应于条件的互斥体的锁内(this->m_childrenConditionVariableMutex )。

参见 this answer原因。

关于c++ - 条件变量的通知丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30940747/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com