gpt4 book ai didi

c++ - 线程安全的无锁内存池 : free function does not behave correctly in multi-thread

转载 作者:行者123 更新时间:2023-12-05 04:29:37 30 4
gpt4 key购买 nike

我有一个相同大小缓冲区的线程安全分配器的简单实现。在内部,实现是一个非常简单的互锁单链表,它利用未分配缓冲区中未使用的空间来维护单链表。

还编写了一些测试,在单线程模式下测试代码 - 一切似乎都正常。设法将问题隔离到 Free 函数,但我似乎找不到它。

我不得不提一下,我用完全相同的代码运行了一些测试 Microsoft's Interlocked Singly Linked Lists ,显然它有效,但我仍然想找出我的实现有什么问题。甚至尝试反汇编代码并应用类似的内在函数,但它没有帮助(还要注意,我不需要跟踪列表条目的数量,所以这就是为什么我不需要互锁功能来交换双寄存器-size 元素,如 x64 的 InterlockedCompareExchange128)

这是分配器的代码:

#ifndef _POOLNOLOCK_HPP_
#define _POOLNOLOCK_HPP_

#include <windows.h>

template<size_t TSizeOfElem>
class PoolNoLock {
public:
PoolNoLock(size_t N) :
n(N),
arr(new ELEMENT[n])
{
for (size_t i = 0; (n - 1) > i; ++i)
{
arr[i].next = &arr[i + 1];
}
arr[n - 1].next = nullptr;

for (size_t i = 0; n > i; ++i)
{
arr[i].allocated = false;
}
}

~PoolNoLock() { delete[] arr; }

void *Alloc()
{
ELEMENT *allocBuff;

do
{
allocBuff = ptrFree;
if (!allocBuff)
{
return nullptr;
}
} while (allocBuff != InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next,
allocBuff
));

if (allocBuff->allocated)
{
__debugbreak(); //will break here
}

allocBuff->allocated = true;

return &allocBuff->buff;
}

void Free(void *Address)
{
ELEMENT *const freeBuff = reinterpret_cast<ELEMENT *>(Address);

if (!freeBuff->allocated)
{
__debugbreak();
}

freeBuff->allocated = false;

ELEMENT *cmpFree = ptrFree;
do
{
freeBuff->next = cmpFree;

ELEMENT *const xchgFree =
reinterpret_cast<ELEMENT *>(InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
freeBuff,
cmpFree
));

if (xchgFree == cmpFree)
{
break;
}

cmpFree = xchgFree;
} while (true);
}

private:
typedef struct _ELEMENT {
union {
_ELEMENT *next;
unsigned char buff[TSizeOfElem];
};
bool allocated; //debug info
}ELEMENT;

const size_t n;
ELEMENT *const arr; //array of list elements

ELEMENT *volatile ptrFree = &arr[0]; //head of "singly" linked list
};

#endif // _POOLNOLOCK_HPP_

这是我用来对类(class)进行压力测试的代码:

  1. 64是最大对象数,即WaitForMultipleObjects可以等
  2. 线程中的等待有助于实现尽可能多的线程访问资源的场景
  3. 产生的线程数恰好等于分配器中的元素数,这就是 alloc-only 测试有效的原因
#include "PoolNoLock.hpp"
#include <vector>
#include <map>
#include <iostream>

static constexpr size_t N_THREAD = 64;
static constexpr size_t N_TEST_RUN = 4;
static constexpr size_t N_ALLOC_FREE = 1024;

struct ThreadParam {
PoolNoLock<sizeof(size_t)> *allocator;
const HANDLE &hStartEvent;
void *addressAlloc = nullptr;

ThreadParam(PoolNoLock<sizeof(size_t)> *Allocator, const HANDLE &StartEvent) :
allocator(Allocator),
hStartEvent(StartEvent)
{};
};

template<bool TAllocOnly>
class Test {
public:
~Test()
{
CloseHandle(hStartEvent);
}

bool RunSingle(PoolNoLock<sizeof(size_t)> *Allocator)
{
std::vector<ThreadParam> params(N_THREAD, ThreadParam(Allocator, hStartEvent));

if (TRUE != ResetEvent(hStartEvent))
{
return false;
}

for (size_t i = 0; N_THREAD != i; ++i)
{
handles[i] = CreateThread(nullptr,
0,
reinterpret_cast<PTHREAD_START_ROUTINE>(threadProc),
&params[i],
CREATE_SUSPENDED,
&tids[i]);
if (!handles[i])
{
return false;
}
}

for (HANDLE handle : handles)
{
if (1 != ResumeThread(handle))
{
return false;
}
}

if (TRUE != SetEvent(hStartEvent))
{
return false;
}

if ((WAIT_OBJECT_0 + N_THREAD - 1) < WaitForMultipleObjects(N_THREAD, handles, TRUE, INFINITE))
{
return false;
}

for (size_t i = 0; N_THREAD != i; ++i)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(handles[i], 0))
{
return false;
}

DWORD exitCode;
if (TRUE != GetExitCodeThread(handles[i], &exitCode))
{
return false;
}

if (0 != exitCode)
{
return false;
}

if (TRUE != CloseHandle(handles[i]))
{
return false;
}
}

if (TAllocOnly)
{
std::map<void *, DWORD> threadAllocations;

for (size_t i = 0; N_THREAD != i; ++i)
{
if (!params[i].addressAlloc)
{
return false;
}

if (threadAllocations.end() == threadAllocations.find(params[i].addressAlloc))
{
return false;
}

std::pair<std::map<void *, DWORD>::iterator, bool> res =
threadAllocations.insert(std::make_pair(params[i].addressAlloc, tids[i]));

if (!res.second)
{
return false;
}

Allocator->Free(params[i].addressAlloc);
}

if (N_THREAD != threadAllocations.size())
{
return false;
}
}

return false;
}

bool RunMultiple()
{
for (size_t i = 0; N_TEST_RUN != i; ++i)
{
PoolNoLock<sizeof(size_t)> allocator(N_THREAD);
RunSingle(&allocator);
}

return true;
}

private:
const HANDLE hStartEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);

HANDLE handles[N_THREAD] = { nullptr };
DWORD tids[N_THREAD] = { 0 };

static DWORD WINAPI ThreadProcAllocOnly(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}

Param->addressAlloc = Param->allocator->Alloc();
if (!Param->addressAlloc)
{
return 3;
}

return 0;
}

static DWORD WINAPI ThreadProcAllocFree(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}

for (size_t i = 0; N_ALLOC_FREE != i; ++i)
{
void *ptrTest = Param->allocator->Alloc();
if (!ptrTest)
{
return 3;
}

Param->allocator->Free(ptrTest);
}

return 0;
}

const LPTHREAD_START_ROUTINE threadProc =
TAllocOnly
? reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocOnly)
: reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocFree);
};

int main()
{
Test<true> testAllocOnly0;
Test<false> TestAllocFree0;

if (!testAllocOnly0.RunMultiple()) //this test will succeed
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc-ONLY tests succeeded" << std::endl;

if (!TestAllocFree0.RunMultiple()) //this test will fail
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc/free tests succeeded" << std::endl;

std::cout << "All tests succeeded" << std::endl;

return 0;
}

最佳答案

Alloc() 例程中的错误。更准确地符合

InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next, // <-- !!!
allocBuff)

这里有 2 个操作:首先 cpu 从 allocBuff 指针读取 allocBuff->next 然后在 ptrFree 上尝试 CAS> 但是这 2 个操作不是原子的,可以在它之间中断。在您尝试使用 allocBuff->next 时 - allocBuff 可能已经被另一个线程分配并且 next 被覆盖为垃圾(不是有效指针例如)。

所以让存在 2 个线程:T#1 和 T#2

  • T#1 从ptrFree读取allocBuff
  • T#2 从ptrFree读取allocBuff
  • T#2 返回allocBuff给用户
  • T#2 将 allocBuff->next 的上下文覆盖到用户数据,设为 -1。
  • T#1 读取了 next = allocBuff->next 并得到了一些用户数据 (-1)
  • T#2 释放/将 allocBuff 推回 ptrFree
  • T#1 在 CAS 中正常,因为 ptrFree 再次指向 allocBuffptrFree 点现在-1
  • T#2 从ptrFree读取-1
  • T#2 尝试从-1读取
  • 崩溃

这里甚至只有堆栈中的单个元素 (a) 和 2 个线程足以进行演示。让同样的例子:其中头(F)和元素(a)在堆栈中。初始状态:F -> 一个 -> 0

  • T#1 从F中读出a
  • T#2 读a 来自F
  • T#2 从 a
  • 中读取了 0
  • T#2 将 0 写入 F 并返回 a 给用户:F -> 0
  • T#2 将-1写入a:a = -1;
  • T#1 从a
  • 读取 -1
  • T#2 免费 a:F -> a -> 0
  • T#1 将 -1 写入 F 并返回 a 给用户:F -> -1
  • T#2 从 F 读取 -1
  • T#2 尝试读取-1

这里可能还有另一场比赛

F - a - b - c

并且您想要弹出 a 并将 F 分配给 b。但在你这样做之前,另一个线程首先弹出a,所以现在

F-b-c

然后弹出b:

F - c

并推送/释放a:

F - a - c.

因为 F 再次指向 a,CAS 就可以了,你就可以建立链了

F - b - 垃圾

因为 b 现在真的在用。


无论如何,您的实现远非最佳。你在这里不需要模板,TSizeOfElemn 只需要在初始化过程中知道,不再需要。压力测试需要的线程不多,关键点延迟

void NotifyAllocated(PSINGLE_LIST_ENTRY allocBuff)
{
allocBuff->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE; // allocated = true;

WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"alocated = %p", allocBuff);

MessageBoxW(0, txt, sz, MB_ICONINFORMATION); // simulate delay !
}

void NotifyCheck(PVOID buf, PCWSTR fmt)
{
WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), fmt, buf);
MessageBoxW(0, txt, sz, MB_ICONWARNING); // simulate delay !
}

class PoolNoLock
{
PVOID _arr = 0; //array of list elements
PSINGLE_LIST_ENTRY _ptrFree = 0; //head of "singly" linked list

public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}

union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};

if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;

PSINGLE_LIST_ENTRY ptrFree = 0;

do
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);

_ptrFree = ptrFree;
}

return true;
}

return false;
}

~PoolNoLock()
{
if (PVOID buf = _arr)
{
delete[] buf;
}
}

void *Alloc()
{
PSINGLE_LIST_ENTRY allocBuff, ptrFree = _ptrFree;

for (;;)
{
allocBuff = ptrFree;

if (!allocBuff)
{
return 0;
}

NotifyCheck(allocBuff, L"try: %p");

// access allocBuff->Next !!
PSINGLE_LIST_ENTRY Next = allocBuff->Next;

NotifyCheck(Next, L"next: %p");

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Next, allocBuff);
//ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, allocBuff->Next, allocBuff);

if (ptrFree == allocBuff)
{
NotifyAllocated(allocBuff);
return allocBuff;
}
}
}

void Free(void *Address)
{
PSINGLE_LIST_ENTRY ptrFree = _ptrFree, newFree;

for ( ; ; ptrFree = newFree)
{
reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree;

newFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Address, ptrFree);

if (newFree == ptrFree)
{
return ;
}
}
}
};

ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 2;
do
{
WCHAR sz[16], txt[32];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"loop %x", n);
MessageBoxW(0, txt, sz, MB_OK);
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);

return 0;
}

void DemoTest()
{
PoolNoLock p;
if (p.Init(1, sizeof(PVOID)))
{
ULONG n = 2;
do
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}

MessageBoxW(0, 0, L"Wait", MB_OK);
}

这与您的代码相同,只是进行了优化。错误相同 - 在

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, allocBuff->Next, allocBuff);

为了测试和更好地理解 - 最好写成

PSINGLE_LIST_ENTRY Next = allocBuff->Next;

// delay !!

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, Next, allocBuff);

为了解决这个问题,使用了 SLIST_HEADER - 实际上这是指向堆栈顶部的指针 + 操作计数的组合。接下来是正确的实现(如果不是直接使用 SLIST_HEADER 和 api)

class PoolNoLock 
{
PVOID _arr = 0; //array of list elements

struct U
{
PSINGLE_LIST_ENTRY ptr; //head of "singly" linked list
ULONG_PTR allocCount;

void operator = (U& v)
{
ptr = v.ptr;
allocCount = v.allocCount;
}

U(U* v)
{
ptr = v->ptr->Next;
allocCount = v->allocCount + 1;
}

U(PSINGLE_LIST_ENTRY ptr, ULONG_PTR allocCount) : ptr(ptr), allocCount(allocCount)
{
}

U() : ptr(0), allocCount(0)
{
}

} u;

//++ debug
LONG _allocMiss = 0;
LONG _freeMiss = 0;
//-- debug

public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}

union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};

if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;

PSINGLE_LIST_ENTRY ptrFree = 0;

do
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);

u.ptr = ptrFree;
u.allocCount = 0;
}

return true;
}

return false;
}

~PoolNoLock()
{
if (PVOID buf = _arr)
{
delete[] buf;
}
}

void *Alloc()
{
for (;;)
{
U allocBuff = u;

if (!allocBuff.ptr)
{
return 0;
}

U Next(&allocBuff);

if (InterlockedCompareExchange128((LONG64*)&u, Next.allocCount, (LONG64)Next.ptr, (LONG64*)&allocBuff))
{
// for debug only
allocBuff.ptr->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE;

return allocBuff.ptr;
}

// for debug only
InterlockedIncrementNoFence(&_allocMiss);
}
}

void Free(void *Address)
{
for ( ; ; )
{
U ptrFree = u;
U a(reinterpret_cast<PSINGLE_LIST_ENTRY>(Address), ptrFree.allocCount);

reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree.ptr;

if (InterlockedCompareExchange128((LONG64*)&u, a.allocCount, (LONG64)a.ptr, (LONG64*)&ptrFree))
{
return ;
}

// for debug only
InterlockedIncrementNoFence(&_freeMiss);
}
}
};

ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 0x10000;
do
{
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);

return 0;
}

void DemoTest()
{
PoolNoLock p;
if (p.Init(16, sizeof(PVOID)))
{
ULONG n = 8;
do
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}

MessageBoxW(0, 0, L"Wait", MB_OK);
}

关于c++ - 线程安全的无锁内存池 : free function does not behave correctly in multi-thread,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72263665/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com