gpt4 book ai didi

javascript - 插入流式 XML 数据数据库

转载 作者:可可西里 更新时间:2023-11-01 10:02:28 26 4
gpt4 key购买 nike

我正在尝试高效地插入大量数据(XML 文件的大小超过 70GB)而不会使我的 MongoDB 服务器崩溃。目前这就是我正在使用 xml-stream 做的事情在 NodeJS 中:

var fs = require('fs'),
path = require('path'),
XmlStream = require('xml-stream'),
MongoClient = require('mongodb').MongoClient,
assert = require('assert'),
ObjectId = require('mongodb').ObjectID,
url = 'mongodb://username:password@my.server:27017/mydatabase',
amount = 0;

var stream = fs.createReadStream(path.join(__dirname, 'motor.xml'));
var xml = new XmlStream(stream);

xml.collect('ns:Statistik');
xml.on('endElement: ns:Statistik', function(item) {
var insertDocument = function(db, callback) {
db.collection('vehicles').insertOne(item, function(err, result) {
amount++;
if (amount % 1000 == 0) {
console.log("Inserted", amount);
}
callback();
});
};

MongoClient.connect(url, function(err, db) {
insertDocument(db, function() {
db.close();
});
});
});

当我调用 xml.on() 时,它基本上返回我当前所在的树/元素。由于这是直接的 JSON,我可以将它作为参数提供给我的 db.collection().insertOne() 函数,它会按照我想要的方式将它插入到数据库中。

所有代码实际上都按现在的方式运行,但在大约 3000 次插入(大约需要 10 秒)后停止。我怀疑这是因为每次我在 XML 文件中看到一棵树时,我都会打开一个数据库连接,插入数据,然后关闭连接,在本例中大约有 3000 次。

我可以以某种方式合并 insertMany() 函数并以 100 秒(或更多)的 block 来执行它,但我不太确定这将如何处理所有流式传输和异步。

所以我的问题是:如何将大量 XML(到 JSON)插入到我的 MongoDB 数据库中而不会崩溃?

最佳答案

你的推测是对的 .insertMany()会比每次都写要好,所以这实际上只是在 "stream" 上收集数据的问题.

由于执行是“异步的”,您通常希望避免在堆栈上有太多的事件调用,所以通常您 .pause() "stream"在调用 .insertMany() 之前然后 .resume()一旦回调完成:

var fs = require('fs'),
path = require('path'),
XmlStream = require('xml-stream'),
MongoClient = require('mongodb').MongoClient,
url = 'mongodb://username:password@my.server:27017/mydatabase',
amount = 0;

MongoClient.connect(url, function(err, db) {

var stream = fs.createReadStream(path.join(__dirname, 'motor.xml'));
var xml = new XmlStream(stream);

var docs = [];
//xml.collect('ns:Statistik');

// This is your event for the element matches
xml.on('endElement: ns:Statistik', function(item) {
docs.push(item); // collect to array for insertMany
amount++;

if ( amount % 1000 === 0 ) {
xml.pause(); // pause the stream events
db.collection('vehicles').insertMany(docs, function(err, result) {
if (err) throw err;
docs = []; // clear the array
xml.resume(); // resume the stream events
});
}
});

// End stream handler - insert remaining and close connection
xml.on("end",function() {
if ( amount % 1000 !== 0 ) {
db.collection('vehicles').insertMany(docs, function(err, result) {
if (err) throw err;
db.close();
});
} else {
db.close();
}
});

});

或者甚至对其进行一些现代化改造:

const fs = require('fs'),
path = require('path'),
XmlStream = require('xml-stream'),
MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://username:password@my.server:27017/mydatabase';

(async function() {

let amount = 0,
docs = [],
db;

try {

db = await MongoClient.connect(uri);

const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')),
xml = new XmlStream(stream);

await Promise((resolve,reject) => {
xml.on('endElement: ns:Statistik', async (item) => {
docs.push(item);
amount++;

if ( amount % 1000 === 0 ) {
try {
xml.pause();
await db.collection('vehicle').insertMany(docs);
docs = [];
xml.resume();
} catch(e) {
reject(e)
}
}

});

xml.on('end',resolve);

xml.on('error',reject);
});

if ( amount % 1000 !== 0 ) {
await db.collection('vehicle').insertMany(docs);
}

} catch(e) {
console.error(e);
} finally {
db.close();
}

})();

请注意,MongoClient 连接实际上包装了所有其他操作。您只想连接一次,其他操作发生在 "stream" 的事件处理程序上.

因此对于您的 XMLStream,事件处理程序在表达式匹配时被触发,数据被提取并收集到一个数组中。每 1000 件 .insertMany()进行调用以插入文档,通过“异步”调用“暂停”和“恢复”。

完成后,将在 "stream" 上触发“结束”事件.这是您关闭数据库连接的地方,事件循环将被释放并结束程序。

虽然可以通过允许各种 .insertMany() 获得某种程度的“并行性”调用一次发生(并且通常为“池化大小”以便不超出调用堆栈),这基本上是进程在等待其他异步 I/O 完成时简单地暂停的最简单形式。

NOTE: Commenting out the .collect() method from your original code as per the follow up question this would appear not to be necessary and in fact is retaining the nodes in memory that really should be discarded after each write to the database.

关于javascript - 插入流式 XML 数据数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45527012/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com