gpt4 book ai didi

node.js - 如何在并发 AWS lambda 函数中管理 Postgres 连接?

转载 作者:行者123 更新时间:2023-12-05 07:24:22 25 4
gpt4 key购买 nike

有人有使用 Postgres 构建并发 AWS Lambda 函数的经验吗?

我必须构建一个 lambda cron,它将数千张发票提取到 Postgres 数据库中。我必须同时为每张发票调用摄取 lambda 函数。问题是,因为它是并发的,所以摄取函数的每个实例都会创建一个到数据库的连接。这意味着,如果我要摄取 1000 个发票,每个发票都会调用一个 lambda 函数,这将创建 1000 个数据库连接。这将耗尽 Postgres 可以处理的最大连接数。调用的 lambda 函数的某些实例将返回一个错误,指出没有更多可用连接。

对于如何处理这个问题,您有什么建议吗?

这是我的一些代码片段:

ingestInvoiceList.js

var AWS = require('aws-sdk');
var sftp = require('ssh2-sftp-client');

var lambda = AWS.Lambda();

exports.handler = async (evenrt) => {
...

let folder_contents;
try {
// fetch list of Zip format invoices
folder_contents = await sftp.list(client_folder);
} catch (err) {
console.log(`[${client}]: ${err.toString()}`);
throw new Error(`[${client}]: ${err.toString()}`);
}

let invoiceCount = 0;

let funcName = 'ingestInvoice';


for (let item of folder_contents) {
if (item.type === '-') {
let payload = JSON.stringify({
invoice: item.name
});
let params = {
FunctionName: funcName,
Payload: payload,
InvocationType: 'Event'
};


//invo9ke ingest invoice concurrently
let result = await new Promise((resolve) => {
lambda.invoke(params, (err, data) => {
if (err) resolve(err);
else resolve(data);
});
});

console.log('result: ', result);

invoiceCount++;
}
}
...
}

ingestInvoice.js

var AWS = require('aws-sdk');
var sftp = require('ssh2-sftp-client');
var DBClient = require('db.js')l

var lambda = AWS.Lambda();

exports.handler = async (evenrt) => {
...

let invoice = event.invoice;
let client = 'client name';

let db = new DBClient();

try {
console.log(`[${client}]: Extracting documents from ${invoice}`);

try {
// get zip file from sftp server
await sftp.fastGet(invoice, '/tmp/tmp.zip', {});
} catch (err) {
throw err;
}


let zip;
try {
// extract the zip file...
zip = await new Promise((resolve, reject) => {
fs.readFile("/tmp/tmp.zip", async function (err, data) {
if (err) return reject(err);

let unzippedData;
try {
unzippedData = await JSZip.loadAsync(data);
} catch (err) {
return reject(err);
}

return resolve(unzippedData);
});
});

} catch (err) {
throw err;
}

let unibillRegEx = /unibill.+\.txt/g;

let files = [];
zip.forEach(async (path, entry) => {
if (unibillRegEx.exec(entry.name)) {
files['unibillObj'] = entry;
} else {
files['pdfObj'] = entry;
}
});


// await db.getClient().connect();
await db.setSchema(client);
console.log('Schema has been set.');

let unibillStr = await files.unibillObj.async('string');

console.log('ingesting ', files.unibillObj.name);

//Do ingestion queries here...
...

await uploadInvoiceDocsToS3(client, files);

} catch (err) {
console.error(err.stack);
throw err;
} finally {
try {
// console.log('Disconnecting from database...');
// await db.endClient();
console.log('Disconnecting from SFTP...');
await sftp.end();
} catch (err) {
console.log('ERROR: ' + err.toString());
throw err;
}
}
...
}

db.js

var { Pool } = require('pg');

module.exports = class DBClient {
constructor() {
this.pool = new Pool();
}

async setSchema(schema) {
await this.execQuery(`SET search_path TO ${schema}`);
}

async execQuery(sql) {
return await this.pool.query(sql);
}
}

如果有任何答案,我们将不胜感激,谢谢!

最佳答案

我看到了两种处理方法。最终,这取决于您希望以多快的速度处理这些数据。

  1. 将 Lambda 的并发设置更改为“保留并发: Reserve Concurrency .

这将允许您限制并发运行的 Lambda 数量(有关更多详细信息,请参阅 this link)。

  1. 更改您的代码以在 SQS 队列中对要完成的工作进行排队。从那里您必须创建另一个 Lambda 以由队列触发并根据需要处理它。这个 Lambda 可以决定一次从队列中拉出多少,它也可能需要限制并发性。但是您可以将其调整为,例如,最多运行 15 分钟,这可能足以清空队列并且不会终止数据库。或者,如果您的最大并发数为 100,那么您可以快速处理而不会终止数据库。

关于node.js - 如何在并发 AWS lambda 函数中管理 Postgres 连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55479591/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com