- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我正在尝试扫描驱动器目录(递归遍历所有路径)并使用 fs.createWriteStream
将所有路径写入文件(因为它正在查找它们)以保留内存使用率低,但不起作用,扫描期间内存使用量达到2GB。
我期望 fs.createWriteStream
始终自动处理内存/磁盘使用情况,将内存使用量保持在最低限度并具有背压。
const fs = require('fs')
const walkdir = require('walkdir')
let dir = 'C:/'
let options = {
"max_depth": 0,
"track_inodes": true,
"return_object": false,
"no_return": true,
}
const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")
let walker = walkdir(dir, options)
walker.on('path', (path) => {
wstream.write(path + '\n')
})
walker.on('end', (path) => {
wstream.end()
})
是不是因为我没有使用 .pipe()
?我尝试创建一个 new Stream.Readable({read{}})
,然后在 .on('path'
发射器中使用 readable 将路径插入其中。 push(path)
但这并没有真正起作用。
更新:
方法二:
我尝试了答案中提出的 drain
方法,但它没有多大帮助,它确实将内存使用量减少到 500mb(这对于流来说仍然太多)但它减慢了代码显着(从几秒到几分钟)
方法三:
我也尝试过使用 readdirp
,它使用的内存更少(~400mb)而且速度更快,但我不知道如何暂停它并使用 drain
方法在那里进一步减少内存使用:
const readdirp = require('readdirp')
let dir = 'C:/'
const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")
readdirp(dir, {alwaysStat: false, type: 'files_directories'})
.on('data', (entry) => {
wstream.write(`${entry.fullPath}\n`)
})
方法四:
我还尝试使用自定义递归 walker 执行此操作,即使它只使用 30mb 的内存,这是我想要的,但它比 readdirp
方法慢 10 倍,并且这是不希望的 同步
:
const fs = require('fs')
const path = require('path')
let dir = 'C:/'
function customRecursiveWalker(dir) {
fs.readdirSync(dir).forEach(file => {
let fullPath = path.join(dir, file)
// Folders
if (fs.lstatSync(fullPath).isDirectory()) {
fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
customRecursiveWalker(fullPath)
}
// Files
else {
fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
}
})
}
customRecursiveWalker(dir)
最佳答案
初步观察:您已尝试使用多种方法获得您想要的结果。比较您使用的方法时,一个复杂的问题是它们并非都做同样的工作。如果您在仅包含常规文件的文件树上运行测试,该树不包含挂载点,您可以可能公平地比较这些方法,但是当您开始添加挂载点、符号链接(symbolic link)等时,您仅由于一种方法排除了另一种方法包含的文件这一事实,可能会获得不同的内存和时间统计信息。
我最初尝试使用 readdirp
的解决方案,但不幸的是,该库对我来说似乎有问题。在我的系统上运行它,我得到了不一致的结果。一次运行会输出 10Mb 的数据,另一次使用相同输入参数的运行会输出 22Mb,然后我会得到另一个数字,等等。我查看了代码,发现它是 does not respect push
的返回值:
_push(entry) {
if (this.readable) {
this.push(entry);
}
}
根据 the documentation push
方法可能会返回 false
值,在这种情况下,Readable
流应该停止生成数据并等到 _read
再次被调用。 readdirp
完全忽略了规范的那一部分。 注意 push
的返回值对于正确处理背压至关重要。该代码中还有其他一些问题。
所以我放弃了这一点,并着手进行概念验证,展示如何做到这一点。关键部分是:
当 push
方法返回 false
时,必须停止向流中添加数据。相反,我们会记录我们所在的位置,然后停下来。
只有当 _read
被调用时,我们才会重新开始。
如果您取消注释打印 START
和 STOP
的 console.log
语句。您会看到它们在控制台上连续打印出来。我们开始,产生数据,直到 Node 告诉我们停止,然后我们停止,直到 Node 告诉我们重新开始,依此类推。
const stream = require("stream");
const fs = require("fs");
const { readdir, lstat } = fs.promises;
const path = require("path");
class Walk extends stream.Readable {
constructor(root, maxDepth = Infinity) {
super();
this._maxDepth = maxDepth;
// These fields allow us to remember where we were when we have to pause our
// work.
// The path of the directory to process when we resume processing, and the
// depth of this directory.
this._curdir = [root, 1];
// The directories still to process.
this._dirs = [this._curdir];
// The list of files to process when we resume processing.
this._files = [];
// The location in `this._files` were to continue processing when we resume.
this._ix = 0;
// A flag recording whether or not the fetching of files is currently going
// on.
this._started = false;
}
async _fetch() {
// Recall where we were by loading the state in local variables.
let files = this._files;
let dirs = this._dirs;
let [dir, depth] = this._curdir;
let ix = this._ix;
while (true) {
// If we've gone past the end of the files we were processing, then
// just forget about them. This simplifies the code that follows a bit.
if (ix >= files.length) {
ix = 0;
files = [];
}
// Read directories until we have files to process.
while (!files.length) {
// We've read everything, end the stream.
if (dirs.length === 0) {
// This is how the stream API requires us to indicate the stream has
// ended.
this.push(null);
// We're no longer running.
this._started = false;
return;
}
// Here, we get the next directory to process and get the list of
// files in it.
[dir, depth] = dirs.pop();
try {
files = await readdir(dir, { withFileTypes: true });
}
catch (ex) {
// This is a proof-of-concept. In a real application, you should
// determine what exceptions you want to ignore (e.g. EPERM).
}
}
// Process each file.
for (; ix < files.length; ++ix) {
const dirent = files[ix];
// Don't include in the results those files that are not directories,
// files or symbolic links.
if (!(dirent.isFile() || dirent.isDirectory() || dirent.isSymbolicLink())) {
continue;
}
const fullPath = path.join(dir, dirent.name);
if (dirent.isDirectory() & depth < this._maxDepth) {
// Keep track that we need to walk this directory.
dirs.push([fullPath, depth + 1]);
}
// Finally, we can put the data into the stream!
if (!this.push(`${fullPath}\n`)) {
// If the push returned false, we have to stop pushing results to the
// stream until _read is called again, so we have to stop.
// Uncomment this if you want to see when the stream stops.
// console.log("STOP");
// Record where we were in our processing.
this._files = files;
// The element at ix *has* been processed, so ix + 1.
this._ix = ix + 1;
this._curdir = [dir, depth];
// We're stopping, so indicate that!
this._started = false;
return;
}
}
}
}
async _read() {
// Do not start the process that puts data on the stream over and over
// again.
if (this._started) {
return;
}
this._started = true; // Yep, we've started.
// Uncomment this if you want to see when the stream starts.
// console.log("START");
await this._fetch();
}
}
// Change the paths to something that makes sense for you.
stream.pipeline(new Walk("/home/", 5),
fs.createWriteStream("/tmp/paths3.txt"),
(err) => console.log("ended with", err));
当我在这里运行您使用 walkdir
进行的第一次尝试时,我得到以下统计信息:
当我使用上面显示的代码时:
我用于测试的文件树生成一个 792 MB 的文件列表
关于javascript - fs.createWriteStream 在将数据写入文件时不使用背压,导致内存使用率高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56040018/
我是一名优秀的程序员,十分优秀!