gpt4 book ai didi

c++ - Mongodb insert_many 性能 - C++

转载 作者:太空宇宙 更新时间:2023-11-04 12:55:22 31 4
gpt4 key购买 nike

我目前正在尝试使用 C++ 应用程序最大限度地提高插入的写入速度到 MongoDB 中。我看到的行为是 insert_many() 操作会变慢,导致写队列建立,然后后续的 insert_many() 操作现在有更多要插入。我做了一个小的示例应用程序来演示这个问题。示例程序定义为两个线程:

  1. 主线程将读取一个字典文件(每行一个单词)并计算单词中每个字母的频率并将每个字母的结果粘贴到一个 vector 中,然后向工作线程发送信号
  2. 工作线程将交换一个线程安全的双缓冲区,然后迭代 vector ,将每个元素变成一个文档。迭代整个 vector 后,我们将为每个字母(集合)进行批量插入。

struct CountData {
CountData(const size_t p_index, const std::string& p_word, const size_t p_count)
: index(p_index)
, word(p_word)
, count(p_count)
{
}

const size_t index = 0;
const std::string word;
const int32_t count = 0;
};

struct CollectionData {
CollectionData(const std::string& collectionName) : name(collectionName) {
options.ordered(false);
auto writeConcern = mongocxx::write_concern{};
writeConcern.acknowledge_level(mongocxx::write_concern::level::k_unacknowledged);
options.write_concern(writeConcern);
}

void push_back(const bsoncxx::document::value& value) { documents.push_back(value); }
size_t size() const { return documents.size(); }

void writeAll(mongocxx::pool& pool) {
auto client = pool.acquire();
auto collection = (*client)["frequency"][name];
collection.insert_many(documents, options);
}
void clear() { documents.clear(); }

private:
const std::string name;
mongocxx::options::insert options;
std::vector<bsoncxx::document::value> documents;
};

class FrequencyCounter {
public:
FrequencyCounter(const std::string& mongoUri, const std::string& dictionaryFile)
: _collectionNames({ "A", "B", "C", "D", "E", "F", "G", "H", "I",
"J", "K", "L", "M", "N", "O", "P", "Q", "R",
"S", "T", "U", "V", "W", "X", "Y", "Z" })
, _mongoPool(mongocxx::uri(mongoUri))
, _dictionary(dictionaryFile)
{
for(const auto& name : _collectionNames) {
_collections.push_back(name);
}
_thread = std::thread(&FrequencyCounter::workerThread, this);
}

~FrequencyCounter() {
_isRunning = false;
_event.notify_one();
_thread.join();
}

void Run() {
std::ifstream inFile(_dictionary);
if(!inFile.is_open()) {
std::cerr << "Could not open definition file: " << _dictionary << std::endl;
std::exit(-1);
}
std::string line;

while(std::getline(inFile, line)) {
std::string word = line;
std::transform(word.begin(), word.end(), word.begin(), ::toupper);
size_t index = 0;
for(const auto& letter : _collectionNames) {
size_t count = std::count(word.begin(), word.end(), letter[0]);
if(count > 0)
_dataQueue.addPending(CountData(index, word, count));
++index;
}
_event.notify_one();
}
}

private:
void writeData(const bool flush=false) {
if(!_dataQueue.trySwap())
return; // No data to write
const auto& dataQueue = _dataQueue.active();
for(const auto& data : dataQueue) {
const uint64_t begin = DateTime::now();
auto doc = bsoncxx::builder::basic::document{};
doc.append(bsoncxx::builder::basic::kvp("word", data.word));
doc.append(bsoncxx::builder::basic::kvp("count", data.count));
_collections[data.index].push_back(doc.extract());
const uint64_t end = DateTime::now();
_docCreationTimes.emplace_back(end - begin);
}

for(auto& collection : _collections) {
const size_t currentSize = collection.size();
if(flush || currentSize >= _maxDocQueueSize) {
const uint64_t begin = DateTime::now();
collection.writeAll(_mongoPool);
const uint64_t end = DateTime::now();
_docInsertionTimes.emplace_back(end - begin);
collection.clear();
}
}
}

void workerThread() {
try {
while(_isRunning) {
_event.wait();
_event.reset();
writeData();
}
const bool flush = true;
writeData(flush);
} catch(const std::exception& ex) {
std::cerr << "Exception in thread: " << ex.what();
}
_isRunning = false;
{
uint64_t minTime = std::numeric_limits<uint64_t>::max();
uint64_t maxTime = 0;
uint64_t sumTime = 0;
uint64_t count = 0;
for(const auto& time : _docCreationTimes) {
if(time < minTime)
minTime = time;
if(time > maxTime)
maxTime = time;
sumTime += time;
++count;
}
std::cout << "Doc Creation Time (avg): " << lPadd(std::to_string(sumTime / count), '0', 12) << "ns" << std::endl;
std::cout << "Doc Creation Time (min): " << lPadd(std::to_string(minTime), '0', 12) << "ns" << std::endl;
std::cout << "Doc Creation Time (max): " << lPadd(std::to_string(maxTime), '0', 12) << "ns" << std::endl;
}
{
uint64_t minTime = std::numeric_limits<uint64_t>::max();
uint64_t maxTime = 0;
uint64_t sumTime = 0;
uint64_t count = 0;
for(const auto& time : _docInsertionTimes) {
if(time < minTime)
minTime = time;
if(time > maxTime)
maxTime = time;
sumTime += time;
++count;
}
std::cout << "Doc Insertion Time (avg): " << lPadd(std::to_string(sumTime / count), '0', 12) << "ns" << std::endl;
std::cout << "Doc Insertion Time (min): " << lPadd(std::to_string(minTime), '0', 12) << "ns" << std::endl;
std::cout << "Doc Insertion Time (max): " << lPadd(std::to_string(maxTime), '0', 12) << "ns" << std::endl;
}
}

const size_t _maxDocQueueSize = 10;
const std::vector<std::string> _collectionNames;
mongocxx::instance _mongoInstance;
mongocxx::pool _mongoPool;
std::string _dictionary;
std::vector<CollectionData> _collections;
AtomicVector<CountData> _dataQueue; // thread-safe double buffer
std::vector<uint64_t> _docCreationTimes;
std::vector<uint64_t> _docInsertionTimes;

Event _event;
volatile bool _isRunning = true;
std::thread _thread;
};

int main(int argc, char* argv[]) {
const std::string mongoUri = "mongodb://localhost:27017/?minPoolSize=50&maxPoolSize=50";
const std::string dictionary = "words_alpha.txt";
FrequencyCounter counter(mongoUri, dictionary);
counter.Run();

return 0;
}

结果:

Doc Creation  Time (avg): 000,000,000,837ns
Doc Creation Time (min): 000,000,000,556ns
Doc Creation Time (max): 000,015,521,675ns
Doc Insertion Time (avg): 000,087,038,560ns
Doc Insertion Time (min): 000,000,023,311ns
Doc Insertion Time (max): 005,407,689,435ns

我尝试了以下更改但没有成功:

  • 创建一个在 FrequencyCounter 生命周期内保持打开状态的 Mongo 客户端
  • 从池中取出并为 vector 中的每个项目执行 insert_one()(有和没有池)
  • 为每个字母使用不同的数据库,并且仍然使用池和 insert_many()

我是否可以进行任何优化或更改,让工作线程能够跟上主线程的高吞吐量?

最佳答案

我意识到这是一个相对较老的问题,但您可能会看到使用 collection.bulk_write(bulk_write &bulk_write) 的性能改进在您的工作线程中插入记录。

这些是通过将一系列操作(mongocxx::model::insert_onemongocxx::model::delete_one 等)附加到一个实例来创建的mongocxx::bulk_write (class reference docs)然后使用 collection.bulk_write(bulk_write) 执行一批准备好的操作。

Some nice examples can be found here


性能比较,

测试 1:

Inserted 100000 in 27263651us insert_one
Inserted 100000 in 1129957us insert_many
Inserted 100000 in 916561us insert_bulk

测试 2:

Inserted 100000 in 28196463us insert_one
Inserted 100000 in 1089758us insert_many
Inserted 100000 in 967773us insert_bulk

这些数字是使用下面的代码片段获得的(注意,mongocxx 驱动程序 v3.0.3,MongoDB v3.2):

struct msg {
long num;
long size;
long time;
}

//using insert_one()
void store_msg_one(std::vector<msg> lst)
{
for(int i = 0; i < lst.size(); i++)
{
msg cur_msg = lst[i];
bsoncxx::builder::stream::document msg_info_builder{};
msg_info_builder << "msg_num" << cur_msg.num
<< "msg_size" << cur_msg.size
<< "msg_time" << cur_msg.time;
bsoncxx::document::value doc_val = msg_info_builder << bson::builder::stream::finalize;

collection.insert_one(doc_val.view());
}
}

//using insert_many()
void store_msg_many(std::vector<msg> lst)
{
std::vector<bsoncxx::document::value> lst2;
for(int i = 0; i < lst.size(); i++)
{
msg cur_msg = lst[i];
bsoncxx::builder::stream::document msg_info_builder{};
msg_info_builder << "msg_num" << cur_msg.num
<< "msg_size" << cur_msg.size
<< "msg_time" << cur_msg.time;
bsoncxx::document::value doc_val = msg_info_builder << bson::builder::stream::finalize;
lst2.push_back(doc_val);
}
collection.insert_many(lst2);
}

//using bulk_write()
void store_msg_bulk(std::vector<msg> lst)
{
mongocxx::options::bulk_write bulk_opt;
mongocxx::write_concern wc;
bulk_opt.ordered(false); //see https://docs.mongodb.com/manual/core/bulk-write-operations/
wc.acknowledge_level(mongocxx::write_concern::level::k_default);
bulk_opt.write_concern(wc);

mongocxx::bulk_write bulk = mongocxx::bulk_write{bulk_opt};
for(int i = 0; i < list.size(); i++)
{
msg cur_msg = lst[i];
bsoncxx::builder::stream::document msg_info_builder{};
msg_info_builder << "msg_num" << cur_msg.num
<< "msg_size" << cur_msg.size
<< "msg_time" << cur_msg.time;

bsoncxx::document::value doc_val = msg_info_insert << bsoncxx::builder::stream::finalize;
mongocxx::model::insert_one msg_info_insert_op{doc_val.view()};
bulk.append(msg_info_insert_op);
}
collection.bulk_write(bulk);
}

void main()
{
std::vector<msg> lst;
int num_msg = 100000;
for(int i = 0; i < num_msg; i++)
{
msg info;
info.time = 20*i;
info.num = i;
info.size = sizeof(i);
lst.push_back(info);
}

//Test with insert_one(...)
long long start_microsecs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
store_msg_one(lst);
long long end_microsecs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
std::cout << "Inserted " << num_msg << " in " << end_microsecs - start_microsecs << "us" << " insert_one(...)" << std::endl;

//Test with insert_many(...)
start_microsecs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
store_msg_many(lst);
end_microsecs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
std::cout << "Inserted " << num_msg << " in " << end_microsecs - start_microsecs << "us" << " insert_one(...)" << std::endl;

//Test with bulk_write(...)
start_microsecs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
store_msg_bulk(lst);
end_microsecs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
std::cout << "Inserted " << num_msg << " in " << end_microsecs - start_microsecs << "us" << " insert_bulk(...)" << std::endl;
std::cin.ignore();
}

注意:有关 bulk_write 选项的更多信息,请参阅 MongoDB docs

编辑:格式化

关于c++ - Mongodb insert_many 性能 - C++,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46917489/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com