gpt4 book ai didi

javascript - 同时完成子进程和 Promise 决议

转载 作者:太空宇宙 更新时间:2023-11-03 22:26:30 26 4
gpt4 key购买 nike

tl;dr:我编写的函数创建了多个子进程,这些子进程在提交消息中的数据时解决 promise 。尽管该函数将所有这些 Promise 包装在 Promise.All 中,但该函数将突然返回,并且 Promise.all 既不会解析也不会拒绝,即使所有进程都没有错误地完成。有什么想法为什么会发生这种情况吗?

为了加快数据收集过程,我让一个父进程获取一些输入数据(准确地说是在 SQL 数据库中查询的日期)并将其以相同大小的 block 发送到一定数量的子进程,等待 children 通过将结果包装在一个大的 promise 中来完成他们的数据处理。

虽然这适用于较小的数据集,但对于较大的数据集,父 promise 将简单地返回到命令行 - 既不解析也不拒绝甚至继续该函数。查看一些日志后,似乎虽然所有子进程都正确处理并发送数据,但父进程没有收到少数(即 10 个中的 2 个)进程的结果。丢失的消息发生在数据处理接近结束时(当多个子进程大约同时完成并发送消息时)

缩写代码:

// main function
function createArray(i,j) // returns an array of i empty arrays, each of length j
function chunkify(a, n, balanced) // divides array a into n chunks (balancing them in size if true) returning an array of chunks

function kidcollector(snaptimes,course) {
var done = 1;
var numchild = 10
const chunked = chunkify(snaptimes,numchild,true);

// array of numchild promises to be resolved upon arrival of data
var collectedPromises = _.times(numchild).map(i => {
return new Promise((resolve, reject) => {
var child = child_process.fork('./child.js');
// send chunk of data to each child
child.send({
times:chunked[i],
c:course
});

child.on('error', (err) => {
console.log('Child error.');
reject(err)
});

child.on('message', function(m) {
if (m.err) {
console.log('Got error from '+ m.child, m.err);
reject(m.err);
} else {
console.log('recieved data from ' + m.child + '! ' + done + ' out of ' + numchild);
done++;
resolve(m.data);
}

});
});
})

return Promise.all(collectedPromises)
.then(results => {
// compile all data into one array then return it
})
.catch(err => {
console.log("One of the kids messed up:", err);
})
};

// child.js, a separate file

const connString = // it's a secret!
const client = new Client(connString);
client.connect();

client.on('error', (err) => {
console.error('Client error:', err.stack)
})

process.on('exit', (err) => {
if (err) console.log(process.pid + ' has recieved error:', err);
client.end(() => console.log(process.pid + ' has disconnected on process end', err));
})

process.on('disconnect', (err) => {
if (err) console.log(process.pid + ' has recieved error:', err);
client.end(() => console.log(process.pid + ' has disconnected on process disconnect'));
})

process.on('message', function(m) {
collector(m.times,m.c,process.pid) // async function which compiles data across SQL databases
.then(async function(subdata) {
console.log("all done");
await process.send({
child: process.pid,
data: subdata
});
await process.disconnect();
})
.catch(async function(err) {
console.log("FAILED IN CHILD", err)
await process.send({
child: process.pid,
err: err
});
await process.disconnect();
})
});

因此,在按预期运行一段时间后,数据处理接近结束时,日志如下所示:

all done // child says they're done
recieved data from 5486! 5 out of 10 // parent has received their data
5486 has disconnected on process disconnect // child disconnects
5481 processing snaptime #35 at 2017-07-31T20:26:40.322Z // child is now processing a new time from their given array
all done
recieved data from 5478! 6 out of 10
5478 has disconnected on process disconnect
5483 processing snaptime #34 at 2017-07-31T20:26:51.065Z
5485 processing snaptime #35 at 2017-07-31T20:27:01.876Z
all done // child says they're done
5477 has disconnected on process disconnect // child disconnects, but parent hasn't received data
all done
recieved data from 5481! 7 out of 10 // all good here
5481 has disconnected on process disconnect
5483 processing snaptime #35 at 2017-07-31T20:27:47.834Z
all done
5485 has disconnected on process disconnect // didn't receive message here
all done
recieved data from 5483! 8 out of 10
5483 has disconnected on process disconnect
hansy@Hansys-MacBook-Air ~/Documents/GitHub // and we're at the command line...?

在promise.all()解析时,代码应该记录运行时间,在拒绝时,它应该记录其中一个 child 搞砸了及其错误。

关于发生了什么和/或如何解决这个问题有什么想法吗,特别是因为它只发生在较大的数据集上? (我使用的是带有 10 个子进程的 Node v8.0.0)

最佳答案

你的问题似乎是process.send不会返回您可以等待的 promise ,而是接受(可选)回调。因此,您的 disconnect 调用不会等待消息发送。

当队列中没有更多事件需要处理时,即使 Promise 尚未解决,您的父进程也刚刚完成。您想听的是 exit event子进程的错误,而不仅仅是错误错误。当您拒绝时,您可以确保您的Promise.all始终会解决,无论子进程做什么。

我推荐

// parent

new Promise((resolve, reject) => {
const child = child_process.fork('./child.js');
child.on('error', reject);
child.on('exit', reject);
child.on('message', resolve); // should happen before exit

child.send({
times: chunked[i],
c: course
});
}).then(function(m) {
if (m.err) {
console.log(`received error from #${i} (${m.child})`, m.err);
throw m.err;
} else {
console.log(`received data from #${i} (${m.child})`);
return m.data;
}
}, function(err) {
console.log(`Got abort from #${i} (${m.child})`);
throw err;
});

// child

process.on('message', function(m) {
collector(m.times, m.c, process.pid) // async function which compiles data across SQL databases
.then(function(subdata) {
console.log(process.pid+" done");
return {
child: process.pid,
data: subdata
};
}, function(err) {
console.log(process.pid+" FAILED:", err)
return {
child: process.pid,
err: err
};
}).then(function(data) {
return new Promise(function(resolve, reject) {
process.send(data, function(err) {
if (err) reject(err);
else resolve();
});
});
}).catch(function(err) {
console.log(process.pid+" FAILED to send result", err)
}).then(function() {
process.disconnect();
})
});

关于javascript - 同时完成子进程和 Promise 决议,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45441440/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com