gpt4 book ai didi

javascript - Node.js:可控并发while循环

转载 作者:太空宇宙 更新时间:2023-11-04 01:30:35 25 4
gpt4 key购买 nike

我有一个包含 170 万条记录的 mongodb 集合。每条记录都是一个 ID 号。我需要读取每个 ID 号,向另一个服务发出一些请求,转换数据,将其写入不同的集合,如果一切成功,则删除原始 ID 记录。

我想要一个脚本,它可以无限期地执行这些操作,直到集合为空,并具有可指定的并发性(即任何时候最多 3 个请求)。

通常我会使用 Bluebird 的 map,它可以指定并发 Promise 的数量,但没有输入数组(除非我将所有输入记录读入内存,但我不会这样做)。

我想要的本质上是一个并发的 while 循环,即:(伪 javascript)

promiseWhile(queueNotEmpty, 3){
readFromQueue
.then(doc => {
return process(doc);
})
.then(result => {
if(result == "empty") // (or whatever)
queueNotEmpty = false;
});
}

最佳答案

您可以使用mongodb的游标来异步迭代所有记录。为了让三个工作人员处理它,请将任务包装到异步函数中并多次调用:

 const cursor = db.collection("records").find({});

async function process() {
while(await cursor.hasNext()) {
const record = await cursor.next();
//...
}
}

await Promise.all([ process(), process(), process() ]);

(不过,我不确定 mongodb 驱动程序是否支持并发调用 .next(),您应该测试一下)

<小时/>

否则这个信号量实现可能会有所帮助:

 function Semaphore(count = 1) {
const resolvers = [];
let startCount = count;

return {
aquire() {
return new Promise(resolve => {
if(startCount) { resolve(); startCount -= 1; }
else resolvers.push(resolve);
});
},
free() {
if(resolvers.length) resolvers.pop()();
else startCount += 1;
},
async use(cb) {
await this.aquire();
await cb();
this.free()
},
async done() {
await Promise.all(Array.from({ length: count }, () => this.aquire()));
startCount = count;
},
};
}

Running Demo在您的情况下,它可用作:

 const connectionSemaphore = Semaphore(3);

(async fuction() {
while(await cursor.hasNext()) {
const record = await cursor.next();
/*await*/ connectionSemaphore.use(async () => {
// Do connection stuff concurrently
});
}

await connectionSemaphore.done();
})();

关于javascript - Node.js:可控并发while循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56263994/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com