gpt4 book ai didi

javascript - fs.createWriteStream 在将数据写入文件时不使用背压,导致内存使用率高

转载 作者:IT老高 更新时间:2023-10-28 23:03:24 26 4
gpt4 key购买 nike

问题

我正在尝试扫描驱动器目录(递归遍历所有路径)并使用 fs.createWriteStream 将所有路径写入文件(因为它正在查找它们)以保留内存使用率低,但不起作用,扫描期间内存使用量达到2GB。

预期

我期望 fs.createWriteStream 始终自动处理内存/磁盘使用情况,将内存使用量保持在最低限度并具有背压。

代码

const fs = require('fs')
const walkdir = require('walkdir')

let dir = 'C:/'

let options = {
"max_depth": 0,
"track_inodes": true,
"return_object": false,
"no_return": true,
}

const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")

let walker = walkdir(dir, options)

walker.on('path', (path) => {
wstream.write(path + '\n')
})

walker.on('end', (path) => {
wstream.end()
})

是不是因为我没有使用 .pipe()?我尝试创建一个 new Stream.Readable({read{}}) ,然后在 .on('path' 发射器中使用 readable 将路径插入其中。 push(path) 但这并没有真正起作用。

更新:

方法二:

我尝试了答案中提出的 drain 方法,但它没有多大帮助,它确实将内存使用量减少到 500mb(这对于流来说仍然太多)但它减慢了代码显着(从几秒到几分钟)

方法三:

我也尝试过使用 readdirp,它使用的内存更少(~400mb)而且速度更快,但我不知道如何暂停它并使用 drain 方法在那里进一步减少内存使用:

const readdirp = require('readdirp')

let dir = 'C:/'
const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")

readdirp(dir, {alwaysStat: false, type: 'files_directories'})
.on('data', (entry) => {
wstream.write(`${entry.fullPath}\n`)
})

方法四:

我还尝试使用自定义递归 walker 执行此操作,即使它只使用 30mb 的内存,这是我想要的,但它比 readdirp 方法慢 10 倍,并且这是不希望的 同步:

const fs = require('fs')
const path = require('path')

let dir = 'C:/'
function customRecursiveWalker(dir) {
fs.readdirSync(dir).forEach(file => {
let fullPath = path.join(dir, file)
// Folders
if (fs.lstatSync(fullPath).isDirectory()) {
fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
customRecursiveWalker(fullPath)
}
// Files
else {
fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
}
})
}
customRecursiveWalker(dir)

最佳答案

初步观察:您已尝试使用多种方法获得您想要的结果。比较您使用的方法时,一个复杂的问题是它们并非都做同样的工作。如果您在仅包含常规文件的文件树上运行测试,该树不包含挂载点,您可以可能公平地比较这些方法,但是当您开始添加挂载点、符号链接(symbolic link)等时,您仅由于一种方法排除了另一种方法包含的文件这一事实,可能会获得不同的内存和时间统计信息。

我最初尝试使用 readdirp 的解决方案,但不幸的是,该库对我来说似乎有问题。在我的系统上运行它,我得到了不一致的结果。一次运行会输出 10Mb 的数据,另一次使用相同输入参数的运行会输出 22Mb,然后我会得到另一个数字,等等。我查看了代码,发现它是 does not respect push的返回值:

_push(entry) {
if (this.readable) {
this.push(entry);
}
}

根据 the documentation push 方法可能会返回 false 值,在这种情况下,Readable 流应该停止生成数据并等到 _read 再次被调用。 readdirp 完全忽略了规范的那一部分。 注意 push 的返回值对于正确处理背压至关重要。该代码中还有其他一些问题。

所以我放弃了这一点,并着手进行概念验证,展示如何做到这一点。关键部分是:

  1. push 方法返回 false 时,必须停止向流中添加数据。相反,我们会记录我们所在的位置,然后停下来。

  2. 只有当 _read 被调用时,我们才会重新开始。

如果您取消注释打印 STARTSTOPconsole.log 语句。您会看到它们在控制台上连续打印出来。我们开始,产生数据,直到 Node 告诉我们停止,然后我们停止,直到 Node 告诉我们重新开始,依此类推。

const stream = require("stream");
const fs = require("fs");
const { readdir, lstat } = fs.promises;
const path = require("path");

class Walk extends stream.Readable {
constructor(root, maxDepth = Infinity) {
super();

this._maxDepth = maxDepth;

// These fields allow us to remember where we were when we have to pause our
// work.

// The path of the directory to process when we resume processing, and the
// depth of this directory.
this._curdir = [root, 1];

// The directories still to process.
this._dirs = [this._curdir];

// The list of files to process when we resume processing.
this._files = [];

// The location in `this._files` were to continue processing when we resume.
this._ix = 0;

// A flag recording whether or not the fetching of files is currently going
// on.
this._started = false;
}

async _fetch() {
// Recall where we were by loading the state in local variables.
let files = this._files;
let dirs = this._dirs;
let [dir, depth] = this._curdir;
let ix = this._ix;

while (true) {
// If we've gone past the end of the files we were processing, then
// just forget about them. This simplifies the code that follows a bit.
if (ix >= files.length) {
ix = 0;
files = [];
}

// Read directories until we have files to process.
while (!files.length) {
// We've read everything, end the stream.
if (dirs.length === 0) {
// This is how the stream API requires us to indicate the stream has
// ended.
this.push(null);

// We're no longer running.
this._started = false;
return;
}

// Here, we get the next directory to process and get the list of
// files in it.
[dir, depth] = dirs.pop();

try {
files = await readdir(dir, { withFileTypes: true });
}
catch (ex) {
// This is a proof-of-concept. In a real application, you should
// determine what exceptions you want to ignore (e.g. EPERM).
}
}

// Process each file.
for (; ix < files.length; ++ix) {
const dirent = files[ix];
// Don't include in the results those files that are not directories,
// files or symbolic links.
if (!(dirent.isFile() || dirent.isDirectory() || dirent.isSymbolicLink())) {
continue;
}

const fullPath = path.join(dir, dirent.name);
if (dirent.isDirectory() & depth < this._maxDepth) {
// Keep track that we need to walk this directory.
dirs.push([fullPath, depth + 1]);
}

// Finally, we can put the data into the stream!
if (!this.push(`${fullPath}\n`)) {
// If the push returned false, we have to stop pushing results to the
// stream until _read is called again, so we have to stop.

// Uncomment this if you want to see when the stream stops.
// console.log("STOP");

// Record where we were in our processing.
this._files = files;
// The element at ix *has* been processed, so ix + 1.
this._ix = ix + 1;
this._curdir = [dir, depth];

// We're stopping, so indicate that!
this._started = false;
return;
}
}
}
}

async _read() {
// Do not start the process that puts data on the stream over and over
// again.
if (this._started) {
return;
}

this._started = true; // Yep, we've started.

// Uncomment this if you want to see when the stream starts.
// console.log("START");

await this._fetch();
}
}

// Change the paths to something that makes sense for you.
stream.pipeline(new Walk("/home/", 5),
fs.createWriteStream("/tmp/paths3.txt"),
(err) => console.log("ended with", err));

当我在这里运行您使用 walkdir 进行的第一次尝试时,我得到以下统计信息:

  • 耗时(挂钟):59 秒
  • 最大常驻集大小:2.90 GB

当我使用上面显示的代码时:

  • 耗时(挂钟):35 秒
  • 最大常驻集大小:0.1 GB

我用于测试的文件树生成一个 792 MB 的文件列表

关于javascript - fs.createWriteStream 在将数据写入文件时不使用背压,导致内存使用率高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56040018/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com