- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在实现 lock-free queue Anthony Williams 在“C++ Concurrency in Action”中描述的内容。我将其作为 libcds 的新容器进行测试.弹出和推送测试工作正常。但是多个生产者,多个消费者测试有时会失败。VLD(或 Intel Inspector XE,或 ASan)显示内存泄漏。我通过添加节点析构函数来修复它,但所有元素存在的问题仍然存在。我该如何解决这个问题?谢谢。
Williams 无锁队列:
#include <memory>
template <class T>
class williams_queue
{
public:
williams_queue()
{
counted_node_ptr counted_node;
counted_node.ptr = new node;
counted_node.external_count = 1;
head_.store(counted_node);
tail_.store(head_);
}
williams_queue(const lock_free_queue_mpmc& other) = delete;
williams_queue& operator=(const lock_free_queue_mpmc& other) = delete;
~williams_queue()
{
counted_node_ptr old_head = head_.load();
while (node* const old_node = old_head.ptr)
{
head_.store(old_node->next);
delete old_node;
old_head = head_.load();
}
}
void push(const T& new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail_.load();
while (true)
{
increase_external_count(tail_, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
{
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
{
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next = {0};
if(old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
{
old_next = new_next;
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}
bool pop(Func f)
{
counted_node_ptr old_head = head_.load(std::memory_order_relaxed);
while (true)
{
increase_external_count(head_, old_head);
node* const ptr = old_head.ptr;
if(ptr == tail_.load().ptr)
{
release_ref( p );
return false;
}
counted_node_ptr next = ptr->next.load();
if (head_.compare_exchange_strong(old_head,next))
{
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
f(res.get());
return true;
}
ptr->release_ref();
}
}
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node_counter
{
unsigned internal_count : 30;
unsigned external_counters : 2;
};
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;
node()
{
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
counted_node_ptr new_next;
new_next.ptr = nullptr;
new_next.external_count = 0;
next.store(new_next);
}
};
static void release_ref(node * p)
{
node_counter old_counter = p->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.internal_count;
}
while(!p->count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters)
{
delete p;
}
}
private:
void set_new_tail(counted_node_ptr& old_tail,
const counted_node_ptr& new_tail)
{
node* const current_tail_ptr = old_tail.ptr;
while (!tail_.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr);
if(old_tail.ptr == current_tail_ptr)
{
free_external_counter(old_tail);
}
else
{
release_ref(current_tail_ptr);
}
}
static void increase_external_count(std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
}
while(!counter.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
static void free_external_counter(counted_node_ptr& old_node_ptr)
{
node* const ptr = old_node_ptr.ptr;
const int count_increase = old_node_ptr.external_count - 2;
node_counter old_counter= ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
}
while(!ptr->count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters)
{
delete ptr;
}
}
private:
std::atomic<counted_node_ptr> head_;
std::atomic<counted_node_ptr> tail_;
};
测试结果:
Visual Leak Detector read settings from: D:\Development\COMMON_UTILS\Visual Leak Detector\vld.ini Visual Leak Detector Version 2.5 installed. libcds version 2.1.0 Test started 2017-Jan-31 01:19:03 Using test config file: test-debug.conf System topology: Logical processor count: 4
Queue_ReaderWriter::WilliamsQueue_default
reader count=3 writer count=3 item count=99999...
Item count: 0
Item count: 0
Item count: 0
Post pops: 0
Reader 0 popped count=35822
Reader 1 popped count=32755
Reader 2 popped count=31420
Readers: duration=0.893811, success pop=99997, failed pops=261140
Writers: duration=0.841302, failed push=0d:\development\libcds\tests\unit\queue\queue_reader_writer.cpp(253) : CPPUNIT_CH ECK(nTotalPops + nPostTestPops == nQueueSize: popped=99997 must be 99999); Test consistency of popped sequence...
WARNING: Visual Leak Detector detected memory leaks! ---------- Block 116955 at 0x00DB33D0: 8 bytes ---------- Leak Hash: 0xD835B211, Count: 1, Total 8 bytes Call Stack (TID 2836): ucrtbased.dll!malloc() f:\dd\vctools\crt\vcstartup\src\heap\new_scalar.cpp (19): unit-queue_d.exe!o perator new() + 0x9 bytes d:\development\libcds\cds\container\williams_queue.h (297): unit-queue_d.exe !cds::container::WilliamsQueue::push() d:\development\libcds\tests\unit\queue\queue_reader_writer.cpp (85): unit-qu eue_d.exe!queue::Queue_ReaderWriter::WriterThread >::t est() + 0xF bytes
然后我通过添加节点析构函数和数据删除来修复内存泄漏。但测试失败仍然存在。
测试运行代码
namespace {
static size_t s_nReaderThreadCount = 4;
static size_t s_nWriterThreadCount = 4;
static size_t s_nQueueSize = 100000; // by default 4000000;
struct Value {
size_t nNo;
size_t nWriterNo;
};
}
class Queue_ReaderWriter: public CppUnitMini::TestCase
{
template <class Queue>
class WriterThread: public CppUnitMini::TestThread
{
public:
Queue& m_Queue;
double m_fTime;
size_t m_nPushFailed;
virtual void test()
{
size_t nPushCount = getTest().m_nThreadPushCount;
Value v;
v.nWriterNo = m_nThreadNo;
v.nNo = 0;
m_nPushFailed = 0;
m_fTime = m_Timer.duration();
while ( v.nNo < nPushCount ) {
if (m_Queue.push(v)) {
++v.nNo;
}
else
++m_nPushFailed;
}
m_fTime = m_Timer.duration() - m_fTime;
getTest().m_nWriterDone.fetch_add( 1 );
}
};
template <class Queue>
class ReaderThread: public CppUnitMini::TestThread
{
public:
Queue& m_Queue;
double m_fTime;
size_t m_nPopEmpty;
size_t m_nPopped;
size_t m_nBadWriter;
typedef std::vector<size_t> TPoppedData;
std::vector<TPoppedData> m_WriterData;
virtual void test()
{
m_nPopEmpty = 0;
m_nPopped = 0;
m_nBadWriter = 0;
const size_t nTotalWriters = s_nWriterThreadCount;
Value v;
m_fTime = m_Timer.duration();
while ( true ) {
if ( m_Queue.pop( v ) ) {
++m_nPopped;
if ( /*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters )
m_WriterData[ v.nWriterNo ].push_back( v.nNo );
else
++m_nBadWriter;
}
else
++m_nPopEmpty;
if ( m_Queue.empty() ) {
if ( getTest().m_nWriterDone.load() >= nTotalWriters ) {
CPPUNIT_MSG(" Item count: " << m_Queue.size());
if ( m_Queue.empty() )
break;
}
}
}
m_fTime = m_Timer.duration() - m_fTime;
}
};
protected:
size_t m_nThreadPushCount;
atomics::atomic<size_t> m_nWriterDone;
protected:
template <class Queue>
void analyze( CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
{
typedef ReaderThread<Queue> Reader;
typedef WriterThread<Queue> Writer;
size_t nPostTestPops = 0;
{
Value v;
while ( testQueue.pop( v ))
++nPostTestPops;
}
CPPUNIT_MSG(" Post pops: " << nPostTestPops);
double fTimeWriter = 0;
double fTimeReader = 0;
size_t nTotalPops = 0;
size_t nPopFalse = 0;
size_t nPoppedItems = 0;
size_t nPushFailed = 0;
std::vector< Reader * > arrReaders;
for ( CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it ) {
Reader * pReader = dynamic_cast<Reader *>( *it );
if ( pReader ) {
fTimeReader += pReader->m_fTime;
nTotalPops += pReader->m_nPopped;
nPopFalse += pReader->m_nPopEmpty;
arrReaders.push_back( pReader );
CPPUNIT_CHECK_EX( pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter );
size_t nPopped = 0;
for ( size_t n = 0; n < s_nWriterThreadCount; ++n )
nPopped += pReader->m_WriterData[n].size();
CPPUNIT_MSG( " Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped );
nPoppedItems += nPopped;
}
else {
Writer * pWriter = dynamic_cast<Writer *>( *it );
CPPUNIT_ASSERT( pWriter != nullptr );
fTimeWriter += pWriter->m_fTime;
nPushFailed += pWriter->m_nPushFailed;
if ( !boost::is_base_of<cds::bounded_container, Queue>::value ) {
CPPUNIT_CHECK_EX( pWriter->m_nPushFailed == 0,
"writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed );
}
}
}
CPPUNIT_CHECK_EX( nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems );
CPPUNIT_MSG( " Readers: duration=" << fTimeReader / s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse );
CPPUNIT_MSG( " Writers: duration=" << fTimeWriter / s_nWriterThreadCount << ", failed push=" << nPushFailed );
size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
CPPUNIT_CHECK_EX( nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize );
CPPUNIT_CHECK( testQueue.empty() );
}
template <class Queue>
void test()
{
m_nThreadPushCount = s_nQueueSize / s_nWriterThreadCount;
CPPUNIT_MSG( " reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
<< " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..." );
Queue testQueue;
CppUnitMini::ThreadPool pool( *this );
m_nWriterDone.store( 0 );
// Writers must be first
pool.add( new WriterThread<Queue>( pool, testQueue ), s_nWriterThreadCount );
pool.add( new ReaderThread<Queue>( pool, testQueue ), s_nReaderThreadCount );
pool.run();
analyze( pool, testQueue );
CPPUNIT_MSG( testQueue.statistics() );
}
最佳答案
VLD 的堆栈跟踪告诉您内存分配但未释放的位置:WilliamsQueue::push
, header 中的第 297 行。
分配的内存偶尔会泄漏的地方可能是在 old_next = new_next
行中。您将现有的 counted_node_ptr
复制到一个空的,分配一些新内存,然后没有明显的地方可以删除以前分配的内存。
关于c++ - Anthony Williams 无锁队列节点移除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41946653/
我遇到一种情况,我需要从某个主题读取(正在进行的)消息并将它们放入另一个 Queue 中。我怀疑我是否需要 jms Queue 或者我可以对内存中的 java Queue 感到满意。我将通过同一 jv
队列也是一种操作受限的线性数据结构,与栈很相似。 01、定义 栈的操作受限表现为只允许在队列的一端进行元素插入操作,在队列的另一端只允许删除操作。这一特性可以总结为先进先出(First In
队列的定义 队列(Queue):先进先出的线性表 队列是仅在队尾进行插入和队头进行删除操作的线性表 队头(front):线性表的表头端,即可删除端 队尾(rear):线性表的表尾端,即可插入端 由于这
Redis专题-队列 首先,想一想 Redis 适合做消息队列吗? 1、消息队列的消息存取需求是什么?redis中的解决方案是什么? 无非就是下面这几点: 0、数据可以顺序读
0. 学习目标 栈和队列是在程序设计中常见的数据类型,从数据结构的角度来讲,栈和队列也是线性表,是操作受限的线性表,它们的基本操作是线性表操作的子集,但从数据类型的角度来讲,它们与线性表又有着巨大的不
我想在 redis + Flask 和 Python 中实现一个队列。我已经用 RQ 实现了这样的查询,如果你有 Flask 应用程序和任务在同一台服务器上工作,它就可以正常工作。我想知道是否有可能创
我正在使用 Laravel 5.1,我有一个大约需要 2 分钟来处理的任务,这个任务特别是生成报告...... 现在,很明显,我不能让用户在我接受用户输入的同一页面上等待 2 分钟,而是我应该在后台处
我正在使用 Azure 队列,并且有多个不同的进程从队列中读取数据。 我的系统的构建方式假设每条消息只读取一次。 这个Microsoft article声称 Azure 队列具有至少一次传送保证,这可
我正在创建一个Thread::Queue元素数组。 我这样做是这样的: for (my $i=0; $i new; } 但是,当我在每个队列中填充这样的元素时 $queues[$index]->enq
我试图了解如何将我的 Mercurial 补丁推送到远程存储库(例如 bitbucket.org),而不必先应用它们(实际上提交它们)。我的动机是在最终完成之前首先对我的工作进行远程备份,并且能够与其
我的本地计算机上有一个 Mercurial 队列补丁,我需要与同事共享该补丁,但我不想将其提交到上游存储库。有没有一种简单的方法可以打包该补丁并与他分享? 最佳答案 mq 将补丁作为不带扩展名的文
Java 中是否有任何类提供与 Queue 相同的功能,但有返回对象的选项,并且不要删除它,只需将其设置在集合末尾? 最佳答案 Queue不直接提供这样的方法。但是,您可以使用 poll 和 add
我在Windows上使用Tortoise svn客户端,我需要能够一次提交来自不同子文件夹的更改文件-一次提交。像在提交之前将文件添加到队列中之类的?我该怎么做? Windows上是否还有另一个svn
好吧,我正在尝试对我的 DSAQueue 类进行单元测试,它显示我的 isEmpty()、isFull() 和 dequeue() 方法失败。 以下是我的 DSAQueue 代码。我认为我的 Dequ
我想尽量减少对传入请求的数据库查询。它目前需要写入 6 个不同的表。在返回响应之前不需要完成处理。因此,我考虑了 laravel 队列,但我想知道我是否也可以摆脱写入队列/作业表所需的单独查询。我可以
我正在学习队列数据结构。我想用链表创建队列。我想编程输出:10 20程序输出:队列为空-1 队列为空-1 我哪里出错了? 代码如下: class Node { int x; Node next
“当工作人员有空时,他们会根据主题的优先级列表从等待请求池中进行选择。在时间 t 到达的所有请求都可以在时间 t 进行分配。如果两名工作人员同时有空,则安排优先权分配给最近的工作最早安排的人。如果仍然
我正在开发一个巨大的应用程序,它使用一些子菜单、模式窗口、提示等。 现在,我想知道在此类应用程序中处理 Esc 和单击外部事件的正确方法。 $(document).keyup(function(e)
所以 如果我有一个队列 a --> b --> NULL; 当我使用函数时 void duplicate(QueueNodePtr pHead, QueueNodePtr *pTail) 它会给 a
我正在尝试为键盘输入实现 FIFO 队列,但似乎无法让它工作。我可以让键盘输入显示在液晶显示屏上,但这就是我能做的。我认为代码应该读取键盘输入并将其插入队列,然后弹出键盘输入并将值读取到液晶屏幕上。有
我是一名优秀的程序员,十分优秀!