gpt4 book ai didi

node.js - 如何管理nodejs中Redis的kue模块中的TTL超出错误?

转载 作者:可可西里 更新时间:2023-11-01 11:25:04 29 4
gpt4 key购买 nike

我正在开发 NodeJS 应用程序,在该应用程序中,我使用 Redis 的 kue 模块来管理队列以执行任务。

出现错误“超过 TTL”。由于这个错误,整个 redis 任务队列都被阻塞了,它可能不会自动启动队列,也不允许执行队列中的其他待处理任务。

根据 Kue 文档:

Job producers can set an expiry value for the time their job can live in active state, so that if workers didn't reply in timely fashion, Kue will fail it with TTL exceeded error message preventing that job from being stuck in active state and spoiling concurrency.

我不知道如何处理这种情况。我发布了我的代码,我需要帮助来解决问题。

"use strict";
const redisConst = require('../../constants/' + process.env.NODE_ENV + '.json').redis;
const DataConsolidationController = require('../../api/data-consolidation/controller/data-consolidation-controller');
const ContactController = require('../../api/contact/controller/contact-controller');
const GmailController = require('../../api/email/gmail/gmail-controller');
const FileUploadController = require('../../api/file-upload/controller/file-upload-controller');
var fs = require('fs');
var kue = require('kue');
/*
* If you have a huge concurrency in uncompleted jobs,
* turn this feature off and use queue level events for better memory scaling.
*/
var queue = kue.createQueue({
prefix: 'qt',
redis: redisConst,
jobEvents: false,
removeOnComplete: true
});
var job;
var _io;
var concurrency = 5;

/* queue setting */
queue.on('ready', () => {
console.info('Queue is ready!');
});

// A job get executed
queue.on('job enqueue', (id, type) => {
console.log('Job %s got queued', id);
});

// A job get removed
queue.on('job complete', (id, result) => {
kue.Job.get(id, (err, job) => {
if (err)
return;
job.remove((err) => {
if (err)
throw err;
console.log('Removed completed job #%d', job.id);
});
});
});

queue.on("job process", (id, result) => {
kue.Job.get(id, (err, job) => {
if (err)
return;
console.log("job process is done", job.id);
});
})

queue.on('error', (err) => {
// handle connection errors here
console.error('There was an error in the main queue!');
console.error(err);
console.error(err.stack);
});

queue.watchStuckJobs();

process.once('SIGTERM', (sig) => {
queue.shutdown(5000, (err) => {
console.log('Kue shutdown: ', err || '');
process.exit(0);
});
});


/* workers */
queue.process('import', concurrency, (job, done) => {
switch (job.data.type) {
// File upload import
case 'file-upload-import':
FileUploadController.csvUploadWithQueueTechifyNew(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
//FileUploadController.csvUploadWithQueue(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
.then(result => {
fs.unlinkSync(job.data.filePath);
done();
})
.then(result => {
ContactController.recommendationEngine(null, job.data.userId);
done();
})
.catch(err => {
console.log(err);
done(err);
});
break;
}
});

function findJobCount() {
queue.activeCount((err, count) => {
if (!err)
console.log('**** Active: ', count);
});
queue.inactiveCount((err, count) => {
if (!err)
console.log('**** Inactive:', count);
});
}

module.exports = class QueueController {
static init(io) {
_io = io;
}

/* producers */
static createJob(name, data) {
if (data.type === 'import-salesforce-data') {
job = queue.create(name, data)
.delay(1000)
.ttl(600000)
.attempts(1)
.backoff(true)
.removeOnComplete(true)
.save((err) => {
if (err) {
console.error(err);
done(err);
} else if (!err) {
done();
}
});
} else {
job = queue.create(name, data)
.delay(1000)
.ttl(120000)
.attempts(1)
.backoff(true)
.removeOnComplete(true);
}

job
.on('start', () => {
console.log('Job', job.id, 'is now running');
findJobCount();
})
.on('complete', () => {
console.log('Job', job.id, 'is done');
findJobCount();
})
.on('failed', () => {
console.log('Job', job.id, 'has failed');
job.remove();
findJobCount();
})
.on("progress", () => {
console.log("job", job.id, "is progressing");
});

job.save((err, result) => {
if (err) {
console.log('Error in adding Job: ' + err);
} else {
console.log("Job saved");
}
});
}
}

最佳答案

超过 TTL 表示您的工作未在 TTL 内完成。 请检查为什么您的任务没有在 TTL 内完成??
根据我对您的代码的理解 - 在 queue.process 中,所有 data.type 都没有被处理,因此 done 永远不会被调用并且作业长时间保持事件状态并最终 ttl 过期。

此外,您还有 5 分钟和 10 分钟的 ttl,因此每个工作会保持 5 或 10 分钟的事件状态,因此不会长时间给其他工作机会。 尽可能减少 TTL。

因为您的并发数为 5 意味着您的所有 5 个并发作业都处于事件和卡住状态,因此无法安排其他作业。 尽可能提高并发性。

您还可以使用 kue-ui-express 获取作业及其状态的 GUI 表示

而且,如果 kue 中有许多作业等待执行,则 kue 可能会溢出一些作业。

关于node.js - 如何管理nodejs中Redis的kue模块中的TTL超出错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46784301/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com