gpt4 book ai didi

javascript - 多次同时调用 `cursor.next()`导致驱动崩溃

转载 作者:可可西里 更新时间:2023-11-01 09:48:12 35 4
gpt4 key购买 nike

动机:

我有一个架构涉及许多像这样“消费”文档的工作人员:

worker.on('readyForAnotherDoc', () => worker.consume( await cursor.next() ));

这是一种伪代码 - 我正在检查真实代码中的 cursor.hasNext()。有数百个工作人员,因此 cursor.next() 可能会同时被突然爆发的 200 个请求击中。

我正在尝试解决 mongodb node.js 驱动程序中的一个错误/怪癖,如果我对 cursor.next() 的请求太多,就会导致错误彼此“重叠”巧合的是。

背景:

似乎 MongoDB Node.js 驱动程序没有正确处理 cursor.next 向其抛出大量请求的情况。尝试运行这段代码:

(async function() {

// create a collection for testing:
let db = await require('mongodb').MongoClient.connect('mongodb://localhost:27017/tester-db-478364');
await db.collection("test").drop();
for(let i = 0; i < 1000; i++) {
await db.collection("test").insertOne({num:i, foo:'bar'});
}

let cursor = await db.collection("test").find({});

async function go() {
let doc = await cursor.next();
console.log(doc.num);
}

// start 100 simulataneous requests to `cursor.next()`
for(let i = 0; i < 1000; i++) {
go();
}

})();

这是它为我输出的内容:

0
1
2
3
4
5
6
7
8
9
/home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410
if(workItem.socketTimeout) {
^

TypeError: Cannot read property 'socketTimeout' of null
at Connection.messageHandler (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410:16)
at Socket.<anonymous> (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/connection.js:361:20)
at emitOne (events.js:115:13)
at Socket.emit (events.js:210:7)
at addChunk (_stream_readable.js:252:12)
at readableAddChunk (_stream_readable.js:239:11)
at Socket.Readable.push (_stream_readable.js:197:10)
at TCP.onread (net.js:589:20)

所以在当前批处理用完之前看起来一切正常。但这很奇怪,因为如果您在 .find({}) 之后添加 .batchSize(100),那么它不会修复它。但有趣的是,如果您添加 .batchSize(5),您会得到:

0
1
2
3
4
0
1
2
3
/home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410
if(workItem.socketTimeout) {
^

TypeError: Cannot read property 'socketTimeout' of null
etc...

不知道那里发生了什么......

尝试解决它:

但假设我们此时正在寻找变通办法。假设我们稍微更改了 go 函数:

let cursorBusy = false;
async function go() {
if(cursorBusy) await waitForCursor();
cursorBusy = true;
let doc = await cursor.next();
cursorBusy = false;
console.log(doc.num);
}
function waitForCursor() {
return new Promise(resolve => {
let si = setInterval(() => {
if(!cursorBusy) {
resolve();
clearInterval(si);
}
}, 50);
});
}

这会导致一个新的错误,它似乎出现在各处的 console.log(doc.num) 中:

...
359
415
466
(node:16259) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 12): MongoError: clientcursor already in use? driver problem?
427
433
459
...

我认为这并不能避免错误,因为 setInterval 存在某种“竞争条件”。有趣的是,这是一条不同的错误消息。

问题:有什么方法可以检测光标当前是否“忙”?在修复此错误(如果它甚至是错误)之前,这里还有其他潜在的解决方法吗?

This question有一些相似(但绝对不相同)的行为,并且similar issues似乎出现在第三方 node.js 库中。

最佳答案

您的列表中有一些错误。所以真的只是稍微清理一下:

const MongoClient = require('mongodb').MongoClient;

(async function() {

let db;

try {
db = await MongoClient.connect('mongodb://localhost/test');

await db.collection('test').drop();

await db.collection('test').insertMany(
Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' }))
);

// This is not async. It returns immediately
let cursor = db.collection('test').find();

async function go() {
let doc = await cursor.next(); // This awaits before continuing. Not concurrent.
console.log(doc.num);
}

for ( let i = 0; i < 100; i++ ) {
go(); // Note that these "await" internally
}

} catch(e) {
console.error(e);
} finally {
db.close();
}

})();

真正防弹它,那么您真的应该等待每个操作。因此,在返回时添加 Promise.resolve()await go() 作为良好的衡量标准,并通过减小批量大小来强制打破条件:

const MongoClient = require('mongodb').MongoClient;

(async function() {

let db;

try {
db = await MongoClient.connect('mongodb://localhost/test');

await db.collection('test').drop();

await db.collection('test').insertMany(
Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' }))
);

let cursor = db.collection('test').find().batchSize(1);

async function go() {
let doc = await cursor.next();
console.log(doc.num);
return Promise.resolve();
}

for ( let i = 0; i < 100; i++ ) {
await go();
}

console.log('done');


} catch(e) {
console.error(e);
} finally {
db.close();
}


})();

正确地按顺序打印出来。缩短了,但实际上达到了预期的 99:

0
1
2
3
4
5
6
7
8
9
10
(etc..)

解释主要在代码的注释中,您似乎遗漏了哪些是异步,哪些是不是

因此返回一个 Cursor来自.find() 不是 async 方法,并立即返回。这是因为它只是一个操作句柄,此时不执行任何操作。 MongoDB 驱动程序(所有驱动程序)不会联系服务器或在该端建立游标,直到发出“获取”数据的实际请求。

当您调用 .next() 时是在与服务器进行实际通信并返回“一批”结果时。 “批处理”实际上只影响后续调用是否实际返回服务器或不检索数据,因为“批处理”可能已经有“更多”结果,可以在另一个“批处理”请求之前“清空” "制成。无论如何,每次调用.next() 视为异步,无论是否有外部 I/O。

通常您会调用 .hasNext()包装每个迭代(也是 async )因为调用 .next()Cursor 上没有更多结果是错误的。它通常也是一种“循环控制”的方式,如下所示:

(async function() {

let db;

try {
db = await MongoClient.connect('mongodb://localhost/test');

await db.collection('test').drop();

await db.collection('test').insertMany(
Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' }))
);

let cursor = db.collection('test').find();

async function go() {
let doc = await cursor.next();
console.log(doc.num);
}

//for ( let i = 0; i < 100; i++ ) {
while( await cursor.hasNext() ) { // Check the cursor still has results
go();
}

} catch(e) {
console.error(e);
} finally {
db.close();
}

})();

然后变化到循环直到光标结束。

关于“并发”的注意事项还在于,它通常不是您在这里所期望的。如果您确实想并行发出多个请求,那么您仍然需要等待当前游标提取。如果您不这样做,那么您就是在要求服务器针对所有请求返回相同数据,而不是“迭代”游标时的顺序数据。

您似乎将其与一些实用函数(尤其是 Mongoose asyncEach())在实现并行“获取”时所做的事情混淆了。代码(来自内存)基本上是人为地插入一个setTimeout() 以等待“下一个滴答声”,这基本上意味着每个 .next() 都必须实际触发。

如前所述,它是“人工的”,因为批处理只是有效地 .map()(在底层代码中的某处)变成了一个更大的批处理。

但正如所证明的那样。由于实际“等待”每个 .next(),基本预期用途确实按预期工作。就像你应该做的那样。

关于javascript - 多次同时调用 `cursor.next()`导致驱动崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46780632/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com