gpt4 book ai didi

javascript - 如何聚合 Node.js 流回调中异步函数生成的 Promise?

转载 作者:行者123 更新时间:2023-12-03 07:38:25 25 4
gpt4 key购买 nike

我有一个 Node.js Typescript 程序,在其中尝试逐行解析大型 CSV 文件并异步处理这些行。更具体地说,我需要一个函数来:

  1. 打开 CSV 文件。
  2. 将下一行解析为对象。
  3. (理想情况下)收集一定数量的对象进行批处理。
  4. 将对象传递给异步函数进行处理(返回 Promise)。
  5. 从处理函数中收集 promise 。

一些要求和注意事项:

  • 我需要对这些 promise 进行调查以了解进展情况。
  • 假设这些 CSV 文件很大;逐行流式传输是必要的。
  • 当这些处理操作正在运行时,我不应该阻止应用程序。
  • 返回一系列 Promise 可能不是正确的方法,尤其是当我尝试读取包含一百万行的文件时。
  • 我需要某种 Hook 来取消或重试失败的操作。

这是我已经运行的一些测试代码。 ObjectStream是一个自定义 Node.js 转换,可将 CSV 行转换为对象。

function parseFileAsync(filePath: string): Promise<any> {
var doParseFileAsync = (filePath: string) => {
var streamDeferred = q.defer<Promise<any>[]>();
var promises: Promise<any>[] = [];
var propertyNames: string[] = [];

var stream = fs.createReadStream(filePath, { encoding: "utf8" })
.pipe(new LineStream({ objectMode: true }))
.pipe(new ObjectStream({ objectMode: true }));

stream.on("readable", () => {
var obj: Object;
while ((obj = stream.read()) !== null) {
console.log(`\nRead an object...`);

var operationDeferred = q.defer<any>();
operationDeferred.resolve(doSomethingAsync(obj));
promises.push(operationDeferred.promise);
}
});
stream.on("end", () => {
streamDeferred.resolve(promises);
});

return streamDeferred.promise;
}

return doParseFileAsync(filePath)
.then((result: Promise<any>[]) => {
return q.all(result);
});
}
parseFileAsync(filePath)
.done((result: any[]) => {
console.log(`\nFinished reading and processing the file:\n\t${result.toString()}`);
});

决赛done调用在流开始运行之前执行,因为 parseFileAsync立即满足一个空数组;该直播还没有机会兑现任何 promise 。

经过几天的搜索,我仍然不确定正确的方法是什么。 Node/JavaScript 专家:帮忙?

更新

代码已更新,我的 Promise 现在运行良好。但是,我需要一种方法来 Hook 流并根据需要取消该过程。我还需要一种方法来重试任何失败的操作。

最佳答案

我遇到了程序架构中的一些限制,这些限制不允许我按照自己的意愿自由地传递 Promise。因此,我没有开始做出一堆 promise ,而是决定等到上一批 promise 完成后再开始新的 promise 。这是我采取的方法:

  1. 将流内容分离到其自己的函数中,该函数接受连续标记。返回值将包含读取的数据以及继续 token (如果有更多数据要读取):

    function readFile(filepath: string, lines: number, start: any): Promise<any> {
    ...
    }
  2. 定义一个将运行可重试操作的函数。在此函数体内,检索并处理文件中的数据 block 。如果结果有延续标记,则再次“递归”调用操作函数:

    function processFile(filepath: string, next: any): Promise<any> {
    var chunkSize = 1;
    return readLines(filepath, chunkSize, next)
    .then((result) => {
    // Do something with `result.lines`
    ...
    if (result.next) {
    return parseFile(filepath, result.next);
    }
    });
    }

瞧!对 block 进行长时间运行的操作,并且很容易报告进度。

关于javascript - 如何聚合 Node.js 流回调中异步函数生成的 Promise?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35512496/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com