gpt4 book ai didi

c++ - 创建原子引用计数的尝试因死锁而失败。这是正确的方法吗?

转载 作者:可可西里 更新时间:2023-11-01 18:32:14 28 4
gpt4 key购买 nike

所以我正在尝试创建写时复制映射,它在读取端尝试使用原子引用计数来避免锁定。

有点不对劲。我看到一些引用变得过度增加,而有些则下降为负值,所以有些东西并不是真正的原子。在我的测试中,我有 10 个读取器线程循环 100 次,每个线程执行 get() 和 1 个写入器线程执行 100 次写入。

它卡在了 writer 中,因为一些引用永远不会归零,即使它们应该归零。

我正在尝试使用铺设的 128 位 DCAS 技术 explained by this blog .

这是否有明显的错误,或者是否有更简单的方法来调试它而不是在调试器中使用它?

typedef std::unordered_map<std::string, std::string> StringMap;

static const int zero = 0; //provides an l-value for asm code

class NonBlockingReadMapCAS {

public:

class OctaWordMapWrapper {
public:
StringMap* fStringMap;
//std::atomic<int> fCounter;
int64_t fCounter;

OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { }

OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { }

~OctaWordMapWrapper() {
delete fStringMap;
}

/**
* Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap
* pointer and fCounter.
*/
static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) {
bool cas_result;
__asm__ __volatile__
(
"lock cmpxchg16b %0;" // cmpxchg16b sets ZF on success
"setz %3;" // if ZF set, set cas_result to 1

: "+m" (*target),
"+a" (compareMap), //compare target's stringmap pointer to compareMap
"+d" (compareCounter), //compare target's counter to compareCounter
"=q" (cas_result) //results
: "b" (swapMap), //swap target's stringmap pointer with swapMap
"c" (swapCounter) //swap target's counter with swapCounter
: "cc", "memory"
);
return cas_result;
}



OctaWordMapWrapper* atomicIncrementAndGetPointer()
{

if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
return this;
else
return NULL;
}


OctaWordMapWrapper* atomicDecrement()
{
while(true) {
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1))
break;
}
return this;
}

bool atomicSwapWhenNotReferenced(StringMap* newMap)
{
return doubleCAS(this, this->fStringMap, zero, newMap, 0);
}
}
__attribute__((aligned(16)));

std::atomic<OctaWordMapWrapper*> fReadMapReference;
pthread_mutex_t fMutex;


NonBlockingReadMapCAS() {
fReadMapReference = new OctaWordMapWrapper();
}

~NonBlockingReadMapCAS() {
delete fReadMapReference;
}

bool contains(const char* key) {
std::string keyStr(key);
return contains(keyStr);
}

bool contains(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
bool result = map->fStringMap->count(key) != 0;
map->atomicDecrement();
return result;
}

std::string get(const char* key) {
std::string keyStr(key);
return get(keyStr);
}

std::string get(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
//std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n";
std::string value = map->fStringMap->at(key);
map->atomicDecrement();
return value;
}

void put(const char* key, const char* value) {
std::string keyStr(key);
std::string valueStr(value);
put(keyStr, valueStr);
}

void put(std::string &key, std::string &value) {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
std::pair<std::string, std::string> kvPair(key, value);
newWrapper->fStringMap->insert(kvPair);
fReadMapReference.store(newWrapper);
std::cout << oldWrapper->fCounter << "\n";
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);

}

void clear() {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
fReadMapReference.store(newWrapper);
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);

}

};

最佳答案

也许不是答案,但我觉得这很可疑:

while (oldWrapper->fCounter > 0);
delete oldWrapper;

当计数器为 0 时,您可以让一个读取器线程刚刚进入 atomicIncrementAndGetPointer(),从而通过删除包装器将地毯拉到读取器线程下面。

编辑以总结以下潜在解决方案的评论:

我知道的最佳实现是将 fCounterOctaWordMapWrapper 移动到 fReadMapReference (您不需要 实际上根本没有 OctaWordMapWrapper 类)。当计数器为零时,交换 writer 中的指针。因为您可能会遇到读取器线程的高争用,这实际上会无限期地阻塞写入器,所以您可以为读取器锁分配 fCounter 的最高位,即,当该位被设置时,读取器会旋转直到该位被清除。编写器在要更改指针时设置此位 (__sync_fetch_and_or()),等待计数器降为零(即现有读者完成工作),然后交换指针并清除位。

这种方法应该是防水的,尽管它显然会阻止读取器写入。我不知道这在您的情况下是否可以接受,理想情况下您希望这是非阻塞的。

代码看起来像这样(未经测试!):

class NonBlockingReadMapCAS
{
public:
NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) {}

private:
StringMap *acquire_read()
{
while(1)
{
uint32_t counter=atom_inc(m_counter);
if(!(counter&0x80000000))
return m_ptr;
atom_dec(m_counter);
while(m_counter&0x80000000);
}
return 0;
}

void release_read()
{
atom_dec(m_counter);
}

void acquire_write()
{
uint32_t counter=atom_or(m_counter, 0x80000000);
assert(!(counter&0x80000000));
while(m_counter&0x7fffffff);
}

void release_write()
{
atom_and(m_counter, uint32_t(0x7fffffff));
}

StringMap *volatile m_ptr;
volatile uint32_t m_counter;
};

只需在访问指针之前和之后调用 acquire/release_read/write() 进行读/写。将 atom_inc/dec/or/and() 替换为 __sync_fetch_and_add()__sync_fetch_and_sub()__sync_fetch_and_or()__sync_fetch_and_and() 分别。实际上,您不需要 doubleCAS()

正如@Quuxplusone 在下面的评论中正确指出的那样,这是单生产者和多消费者实现。我修改了代码以正确断言以执行此操作。

关于c++ - 创建原子引用计数的尝试因死锁而失败。这是正确的方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24793474/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com