gpt4 book ai didi

c++ - 使用线程池并行化函数使其变慢 : why?

转载 作者:搜寻专家 更新时间:2023-10-31 02:03:14 27 4
gpt4 key购买 nike

我在数据库上工作,而不是在 RocksDB 上运行。我有一个 find 函数,它接受一个查询参数,遍历数据库中的所有文档,并返回与查询匹配的文档。我想并行化此函数,以便将工作分散到多个线程上。

为此,我尝试使用 ThreadPool : 我把循环的代码移到一个lambda中,并为每个文档在线程池中添加了一个任务。循环后,每个结果都由主线程处理。

当前版本(单线程):

void
EmbeDB::find(const bson_t& query,
DocumentPtrCallback callback,
int32_t limit,
const bson_t* projection)
{
int32_t count = 0;
bson_error_t error;
uint32_t num_query_keys = bson_count_keys(&query);
mongoc_matcher_t* matcher = num_query_keys != 0
? mongoc_matcher_new(&query, &error)
: nullptr;

if (num_query_keys != 0 && matcher == nullptr)
{
callback(&error, nullptr);
return;
}

bson_t document;
rocksdb::Iterator* it = _db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next())
{
const char* bson_data = (const char*)it->value().data();
int bson_length = it->value().size();
std::vector<char> decrypted_data;
if (encryptionEnabled())
{
decrypted_data.resize(bson_length);
bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
bson_data = decrypted_data.data();
}
bson_init_static(&document, (const uint8_t*)bson_data, bson_length);

if (num_query_keys == 0 || mongoc_matcher_match(matcher, &document))
{
++count;

if (projection != nullptr)
{
bson_error_t error;
bson_t projected;
bson_init(&projected);

mongoc_matcher_projection_execute_noop(
&document,
projection,
&projected,
&error,
NULL
);

callback(nullptr, &projected);
}
else
{
callback(nullptr, &document);
}

if (limit >= 0 && count >= limit)
{
break;
}
}
}
delete it;

if (matcher)
{
mongoc_matcher_destroy(matcher);
}
}

新版本(多线程):

void
EmbeDB::find(const bson_t& query,
DocumentPtrCallback callback,
int32_t limit,
const bson_t* projection)
{
int32_t count = 0;
bool limit_reached = limit == 0;
bson_error_t error;
uint32_t num_query_keys = bson_count_keys(&query);
mongoc_matcher_t* matcher = num_query_keys != 0
? mongoc_matcher_new(&query, &error)
: nullptr;

if (num_query_keys != 0 && matcher == nullptr)
{
callback(&error, nullptr);
return;
}

auto process_document = [this, projection, num_query_keys, matcher](const char* bson_data, int bson_length) -> bson_t*
{
std::vector<char> decrypted_data;
if (encryptionEnabled())
{
decrypted_data.resize(bson_length);
bson_length = decrypt_data(bson_data, bson_length, decrypted_data.data(), _encryption_method, _encryption_key, _encryption_iv);
bson_data = decrypted_data.data();
}

bson_t* document = new bson_t();

bson_init_static(document, (const uint8_t*)bson_data, bson_length);

if (num_query_keys == 0 || mongoc_matcher_match(matcher, document))
{
if (projection != nullptr)
{
bson_error_t error;
bson_t* projected = new bson_t();
bson_init(projected);

mongoc_matcher_projection_execute_noop(
document,
projection,
projected,
&error,
NULL
);

delete document;

return projected;
}
else
{
return document;
}
}
else
{
delete document;

return nullptr;
}

};

const int WORKER_COUNT = std::max(1u, std::thread::hardware_concurrency());

ThreadPool pool(WORKER_COUNT);
std::vector<std::future<bson_t*>> futures;

bson_t document;
rocksdb::Iterator* db_it = _db->NewIterator(rocksdb::ReadOptions());
for (db_it->SeekToFirst(); db_it->Valid(); db_it->Next())
{
const char* bson_data = (const char*)db_it->value().data();
int bson_length = db_it->value().size();

futures.push_back(pool.enqueue(process_document, bson_data, bson_length));
}
delete db_it;

for (auto it = futures.begin(); it != futures.end(); ++it)
{
bson_t* result = it->get();

if (result)
{
count += 1;

if (limit < 0 || count < limit)
{
callback(nullptr, result);
}

delete result;
}
}

if (matcher)
{
mongoc_matcher_destroy(matcher);
}
}

  • 对于简单的文档和查询,单线程版本在我的机器上0.5 秒 处理了 100 万个文档。
  • 对于相同的文档和查询,多线程版本在 3.3 秒 内处理 100 万个文档。

令人惊讶的是,多线程版本要慢得多。此外,我测量了执行时间,75% 的时间花在了 for 循环上。所以基本上 futures.push_back(pool.enqueue(process_document, bson_data, bson_length)); 行占用了 75% 的时间。

我做了以下事情:

  • 我检查了 WORKER_COUNT 的值,在我的机器上是 6。
  • 我尝试添加 futures.reserve(1000000),认为可能 vector 重新分配有问题,但它没有改变任何东西。
  • 我尝试删除动态内存分配 (bson_t* document = new bson_t();),结果并没有显着改变。

所以我的问题是:多线程版本比单线程版本慢是我做错了什么吗?

我目前的理解是线程池的同步操作(当任务入队和出队时)只是消耗了大部分时间,解决方案是更改数据结构。想法?

最佳答案

并行化有开销。

在单线程版本中处理每个文档大约需要 500 纳秒。要将工作委托(delegate)给线程池(既要委托(delegate)工作又要同步它),必须完成大量簿记工作,而所有这些簿记工作很可能需要每个作业超过 500 纳秒。

假设您的代码是正确的,那么簿记每个作业大约需要 2800 纳秒。为了从并行化中获得显着的加速,您需要将工作分成更大的 block 。

我建议尝试一次处理 1000 个批处理的文档。每个 future 将对应 1000 个文档,而不是仅对应 1 个文档。

其他优化

如果可能,避免不必要的复制。如果某些东西被复制了一堆,看看你是否可以通过引用而不是通过值来捕获它。

关于c++ - 使用线程池并行化函数使其变慢 : why?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55807718/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com