gpt4 book ai didi

node.js - NodeJS、promise、streams - 处理大型 CSV 文件

转载 作者:IT老高 更新时间:2023-10-28 23:24:33 25 4
gpt4 key购买 nike

我需要构建一个函数来处理用于 bluebird.map() 调用的大型 CSV 文件。鉴于文件的潜在大小,我想使用流式传输。

这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的 block ),并在文件被读取结束(已解决)或错误(被拒绝)时返回一个 promise 。

所以,我开始:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);

// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});

var db = pgp(api.config.mailroom.fileMakerDbConfig);

return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});

}

现在,我有两个相互关联的问题:

  1. 我需要限制正在处理的实际数据量,以免造成内存压力。
  2. 作为 processor 参数传递的函数通常是异步的,例如通过基于 promise 的库(现在: pg-promise)。因此,它会在内存中创造一个 promise ,然后不断地继续前进。

pg-promise 库具有管理此功能的函数,例如 page() ,但我无法提前说明如何将流事件处理程序与这些 Promise 方法混合使用。现在,我在每个 read() 之后的 readable 部分的处理程序中返回一个 promise ,这意味着我创建了大量 promise 的数据库操作并最终出错,因为我达到进程内存限制。

有没有人有一个可以用作跳跃点的工作示例?

更新:可能不止一种给猫剥皮的方法,但这是可行的:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

// some checks trimmed out for example

var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);

var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();

if(records.length)
return records;
};

var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};

parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});

return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}

有人发现这种方法存在潜在问题吗?

最佳答案

您可能想查看 promise-streams

var ps = require('promise-streams');
passedStream
.pipe(csv.parse({trim: true}))
.pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
.wait().then(_ => {
console.log("All done!");
});

适用于背压和一切。

关于node.js - NodeJS、promise、streams - 处理大型 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33129677/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com