gpt4 book ai didi

node.js - NodeJS集群: how to reduce data from workers in master?

转载 作者:太空宇宙 更新时间:2023-11-03 22:09:14 24 4
gpt4 key购买 nike

我是nodejs的新手,我想要的是从数据库读取数据并进行计算。为了使其更快,我使用nodejs集群模块。

有两个全局变量:pairMap和nameSet,我将作业分配给master进程中的worker,它们执行一些计算工作(修改map和set,就像map-reduce一样)

但是,似乎pairMap和nameSet没有被修改并且为空。 (doMasterAction中的代码)(另一个奇怪的事情是我控制台数据,它确实修改了,但最终在主进程中返回为空)。

数据如下(我摘录主要思想):

const Promise = require('bluebird');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
const fs = Promise.promisifyAll(require('fs'))

const utils = {
mergeMap:(source,dest)=>{
for(let [key,value] of Object.entries(source)){
if(!dest.has(key)) dest.set(key,value);
for(let [type,arr] of Object.entries(value)){
const final = new Set([...dest.get[key][type],...arr])
dest.get[key][type] = final;
}
}
}
}


/**
* key: name1@group.com||name2@group.com
* value: {to: [id1,id2,id3],cc,bcc}
* @param row
* @param map
* @param nameSet
*/
function countLinks(res,map,nameSet) {
nameSet.add(res);
map.set(res,{ 'test': Math.floor(Math.random()*10+1)});
}


class hackingTeamPrepare {

constructor(bulk=100000,total = 1150000){
this.bulk = bulk;
this.count = Math.ceil(total / this.bulk);
const parallelArr = new Array(this.count).fill(0).map((v,i)=> i);
this.jobs = parallelArr.map(v=> 'key'+v);
this.pairMap = new Map();
this.nameSet = new Set();

this.bindThis();
}

bindThis(){
this.doWorkerAction = this.doWorkerAction.bind(this);
this.doMasterAction = this.doMasterAction.bind(this);
}

doMasterAction() {
const workers = [],result = {};
const self = this;
let count = 0,timeout;

for(let i=0;i<numCPUs;i++){
const worker = cluster.fork();
workers[i] = worker;
}
cluster.on('online', (worker) => {
worker.send(self.jobs.shift());
});
cluster.on('exit', function() {
if(self.jobs.length===0) return;
console.log('A worker process died, restarting...');
});

cluster.on('message',function (senderWorkder,info) {
const { workerId,jobIndex } = info;
result[jobIndex] = true;
console.log(`----worker ${workerId} done job: ${jobIndex}----`);

const finish = !self.jobs.length && Object.keys(result).length===self.count;
if(finish){
// -----------------!!here!!--------------------------**
console.log('-------finished-------',self.pairMap,self.nameSet); // Map {}, Set {}
for(let id in cluster.workers){
const curWorker = cluster.workers[id];
curWorker.disconnect();
}
}else{
if(!self.jobs.length) return;
senderWorkder.send(self.jobs.shift());
}
})
}



/**
* {[person1,person2]: {to,cc,bcc}}
*/
doWorkerAction() {
//Process为worker, receive from master
const self = this;
process.on('message',(sql)=>{
const jobPromise = Promise.resolve(sql).then(res => {
countLinks(res,self.pairMap,self.nameSet);
const data = {
workerId: process.pid,
jobIndex: sql,
}

// send to master
process.send(data);
}).catch(err=> {
console.log('-----query error----',err)
});
})
}

readFromPG(){
if(cluster.isMaster){
this.doMasterAction();
}else if (cluster.isWorker){
this.doWorkerAction();
}
}

init(){
this.readFromPG();
}
}

const test = new hackingTeamPrepare(2,10);
test.init();

有人可以帮我解决这个问题吗?

我尝试在master进程中手动合并数据,但是worker.send发送的数据似乎忽略了其中的对象。

最佳答案

在Node.js集群中,内存中的对象在master和worker之间不共享。

pairMapnameSet分别存在于master和每个worker中。当worker修改这些对象时,它们在同一个worker(进程)中发生变化,而在master和其他worker中保持不变。

为了让你的想法发挥作用,你需要在主进程中维护一个pairMap和一个nameSet,发送包含你需要从worker到master的任何数据的消息,并使用接收到的数据更新这些对象。

请注意,您不能将任何对象作为消息从工作线程传递到主线程。如果您需要一些复杂的数据,则需要发送纯 JavaScript 对象(键值对)。例如,如果您需要将 Map 实例从工作 Node 发送到主 Node ,请参阅取自 here 的以下函数:

// source - http://2ality.com/2015/08/es6-map-json.html
function mapToJson(map) {
return JSON.stringify([...map]);
}
function jsonToMap(jsonStr) {
return new Map(JSON.parse(jsonStr));
}

// send message using this example:
process.send(mapToJson(pairMap));

// receive message:
worker.on('message', message => console.log(jsonToMap(message)))

关于node.js - NodeJS集群: how to reduce data from workers in master?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47377651/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com