gpt4 book ai didi

node.js - 如何在 node.js 断开连接期间缓冲 MongoDB 插入?

转载 作者:IT老高 更新时间:2023-10-28 13:10:42 27 4
gpt4 key购买 nike

我们确实读取了一个包含大约 500k 元素的 XML 文件(使用 xml-stream),并将它们插入到 MongoDB 中,如下所示:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));

插入 writeDataToDb(type, obj) 看起来像这样:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });

现在,当 Mongo 连接断开时,xml 流仍会读取,并且控制台会充斥着错误消息(无法插入、断开连接、EPIPE 损坏……)。

docs它说:

When you shut down the mongod process, the driver stops processing operations and keeps buffering them due to bufferMaxEntries being -1 by default meaning buffer all operations.

这个缓冲区的实际作用是什么?

我们注意到,当我们插入数据并关闭 mongo 服务器时,这些东西被缓冲了,然后我们恢复了 mongo 服务器, native 驱动程序成功重新连接并且 Node 继续插入数据,但是缓冲的文档(在 mongo beeing 离线期间)做不再插入。

所以我质疑这个缓冲区及其用途。

目标:

我们正在寻找将插入保持在缓冲区中的最佳方法,直到 mongo 返回(根据 wtimeout 在 15000 毫秒内),然后让插入缓冲的文档或使用 xml.pause ();xml.resume() 我们尝试了但没有成功。

基本上,我们需要一些帮助来处理断开连接而不会丢失数据或中断。

最佳答案

使用 insertOne() 插入 500K 元素是一个非常糟糕的主意。您应该改用 bulk operations这允许您在单个请求中插入许多文档。(这里以 10000 为例,因此可以在 50 个单个请求中完成)为避免缓冲问题,您可以手动处理:

  1. 使用 bufferMaxEntries: 0
  2. 禁用缓冲
  3. 设置重连属性:reconnectTries: 30, reconnectInterval: 1000
  4. 创建一个 bulkOperation 并为其提供 10000 个项目
  5. 暂停 xml 阅读器。尝试插入 10000 个项目。如果失败,则每 3000ms 重试一次,直到成功
  6. 如果批量操作在执行过程中中断,您可能会遇到一些重复 ID 问题,请忽略它们(错误代码:11000)

这是一个示例脚本:

var fs = require('fs')
var Xml = require('xml-stream')

var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'

MongoClient.connect(url, {
reconnectTries: 30,
reconnectInterval: 1000,
bufferMaxEntries: 0
}, function (err, db) {
if (err != null) {
console.log('connect error: ' + err)
} else {
var collection = db.collection('product')
var bulk = collection.initializeUnorderedBulkOp()
var totalSize = 500001
var size = 0

var fileStream = fs.createReadStream('data.xml')
var xml = new Xml(fileStream)
xml.on('endElement: product', function (product) {
bulk.insert(product)
size++
// if we have enough product, save them using bulk insert
if (size % 10000 == 0) {
xml.pause()
bulk.execute(function (err, result) {
if (err == null) {
bulk = collection.initializeUnorderedBulkOp()
console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
xml.resume()
} else {
console.log('bulk insert failed: ' + err)
counter = 0
var retryInsert = setInterval(function () {
counter++
bulk.execute(function (err, result) {
if (err == null) {
clearInterval(retryInsert)
bulk = collection.initializeUnorderedBulkOp()
console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
xml.resume()
} else if (err.code === 11000) { // ignore duplicate ID error
clearInterval(retryInsert)
bulk = collection.initializeUnorderedBulkOp()
console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
xml.resume()
} else {
console.log('failed after first try: ' + counter, 'error: ' + err)
}
})
}, 3000) // retry every 3000ms until success
}
})
} else if (size === totalSize) {
bulk.execute(function (err, result) {
if (err == null) {
db.close()
} else {
console.log('bulk insert failed: ' + err)
}
})
}
})
}
})

示例日志输出:

doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]

关于node.js - 如何在 node.js 断开连接期间缓冲 MongoDB 插入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42303008/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com