gpt4 book ai didi

javascript - 如何在所有线程运行完毕后运行代码?

转载 作者:行者123 更新时间:2023-12-03 12:19:01 25 4
gpt4 key购买 nike

我有一个多线程网络爬虫,可以下载网站并将其存储在数据库中(大约需要 4 分钟)。为了加快爬行速度,我使用了 node.js 集群模块,但我有一个问题,我想在所有线程完成它们的过程之后迭代到 while 循环的下一段,而不是在它们开始时立即迭代。如何确保我的所有线程都已结束然后继续?

这是主while循环中的相关代码:

while (indexSize !== indexSizeLimit) {
const queueLength = queue.length;
const numberOfThreads = Math.min(numberOfCPUs, queueLength);
const threadAllocations = Array(numberOfThreads).fill(0);
let queuesAllocated = 0;
const queueChunks = [];

function fillQueueChunks() {
loop: while (true) {
for (let i = 0; i < numberOfThreads; i++) {
threadAllocations[i] += 1;
queuesAllocated += 1;

if (queuesAllocated === queueLength) {
break loop;
};
};
};

let start = 0;

for (let threadAllocation of threadAllocations) {
const end = start + threadAllocation;

queueChunks.push(queue.slice(start, end));

start = end;
};
};

fillQueueChunks();

// Find out how to make multithreading finish, and then move on with the loop.
if (cluster.isMaster) {
for (let i = 0; i < numberOfThreads; i++) {
cluster.fork();
};
} else {
const chunk = queueChunks[cluster.worker.id - 1];

await Promise.all(chunk.map(function (url) {
return new Promise(async function (resolve) {
const webcode = await request(url);

if (webcode !== "Failure") {
indexSize += 1;

const document = new Document(url, webcode);
const hrefs = document.hrefs();
const hrefsQuery = Query(hrefs);
// Also make sure it is not included in indexed webpages.
const hrefIndividualized = hrefsQuery.individualize();

hrefIndividualized;

// Do something with hrefIndividualized in regards to maintaining a queue in the database.
// And in adding a nextQueue which to replace the queue in code with.

await document.save();
};

resolve("Written");
});
}));

process.exit(0);
};
};

最佳答案

将线程包装在一个 promise 中。您可以在父线程中检查是否存在断开连接事件,并且如果断开连接的数量等于线程数,那么您可以解决 promise 。

这是我的

while (indexSize !== indexSizeLimit) {
let nextQueue = [];
const queueLength = queue.length;
const numberOfThreads = Math.min(numberOfCPUs, queueLength);
const threadAllocations = Array(numberOfThreads).fill(0);
let queuesAllocated = 0;
// queueChunks: [[{_id: ..., ...}], [...], ...]
const queueChunks = [];

function fillQueueChunks() {
loop: while (true) {
for (let i = 0; i < numberOfThreads; i++) {
threadAllocations[i] += 1;
queuesAllocated += 1;

if (queuesAllocated === queueLength) {
break loop;
};
};
};

let start = 0;

for (let threadAllocation of threadAllocations) {
const end = start + threadAllocation;

queueChunks.push(queue.slice(start, end));

start = end;
};
};

fillQueueChunks();

await new Promise(async function (resolve) {
if (cluster.isMaster) {
let threadsDone = 0;

for (let i = 0; i < numberOfThreads; i++) {
cluster.fork();
};

cluster.on("disconnect", function (_) {
threadsDone += 1;

if (threadsDone === numberOfThreads) {
resolve("Queue Processed");
};
});
} else {
const queueJob = queueChunks[cluster.id - 1];

await Promise.all(queueJob.map(function (queueItem) {
return new Promise(async function (resolve) {
const url = queueItem._id;
const webcode = await request(url);

if (webcode !== "Failure") {
const document = Document(url, webcode);
let hrefs = document.hrefs();
const hrefsQuery = Query(hrefs);

await document.save();

indexSize += 1;
hrefs = hrefsQuery.individualize();

const hrefIncidences = Promise.all(hrefs.map(function (href) {
return new Promise(async function (resolve) {
const incidences = await Site.countDocuments({
url: href
});

resolve(incidences);
});
}));

hrefs = hrefs.filter(function (_, i) {
return hrefIncidences[i] === 0;
}).map(function (href) {
return {
_id: href
};
});

await Queue.insertMany(hrefs);

nextQueue = nextQueue.concat(hrefs);
};

await Queue.deleteOne({
_id: url
});

resolve("Success");
});
}));

process.exit(0);
};
});

queue = nextQueue;
};

关于javascript - 如何在所有线程运行完毕后运行代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66218951/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com