gpt4 book ai didi

node.js - 如何将背压应用于 Node 流?

转载 作者:搜寻专家 更新时间:2023-10-31 23:24:29 25 4
gpt4 key购买 nike

在尝试使用 Node.JS 流进行试验时,我遇到了一个有趣的难题。当输入(可读)流推送更多数据时,目标(可写)关心我无法正确应用背压。

我尝试的两种方法是从 Writable.prototype._write 返回 false 并保留对 Readable 的引用,这样我就可以调用 Readable.pause()从可写。这两种解决方案都没有多大帮助,我将对此进行解释。

在我的练习中(您可以查看 the full source as a Gist)我有三个流:

可读 - 密码生成器

util.inherits(PasscodeGenerator, stream.Readable);
function PasscodeGenerator(prefix) {
stream.Readable.call(this, {objectMode: true});
this.count = 0;
this.prefix = prefix || '';
}
PasscodeGenerator.prototype._read = function() {
var passcode = '' + this.prefix + this.count;
if (!this.push({passcode: passcode})) {
this.pause();
this.once('drain', this.resume.bind(this));
}
this.count++;
};

我认为 this.push() 的返回代码足以 self 暂停并等待 drain 事件恢复。

转换 - 哈希器

util.inherits(Hasher, stream.Transform);
function Hasher(hashType) {
stream.Transform.call(this, {objectMode: true});
this.hashType = hashType;
}
Hasher.prototype._transform = function(sample, encoding, next) {
var hash = crypto.createHash(this.hashType);
hash.setEncoding('hex');
hash.write(sample.passcode);
hash.end();
sample.hash = hash.read();
this.push(sample);
next();
};

只需将密码的哈希添加到对象即可。

可写 - SampleConsumer

util.inherits(SampleConsumer, stream.Writable);
function SampleConsumer(max) {
stream.Writable.call(this, {objectMode: true});
this.max = (max != null) ? max : 10;
this.count = 0;
}
SampleConsumer.prototype._write = function(sample, encoding, next) {
this.count++;
console.log('Hash %d (%s): %s', this.count, sample.passcode, sample.hash);
if (this.count < this.max) {
next();
} else {
return false;
}
};

在这里,我想尽快使用数据,直到达到我的最大样本数,然后结束流。我尝试使用 this.end() 而不是 return false 但这导致了可怕的 write called after end 问题。如果样本量很小,返回 false 确实会停止一切,但当样本量很大时,我会收到内存不足错误:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Aborted (core dumped)

根据 this SO answer理论上,Write 流将返回 false,导致流缓冲直到缓冲区已满(objectMode 默认为 16),最终 Readable 将调用它是 this.pause() 方法。但是 16 + 16 + 16 = 48;缓冲区中有 48 个对象,直到内容填满并且系统被阻塞。实际上更少,因为不涉及克隆,所以它们之间传递的对象是相同的引用。这是否意味着内存中只有 16 个对象,直到高水位线停止一切?

最后我意识到我可以让 Writable 引用 Readable 以使用闭包调用它的 pause 方法。然而,这个解决方案意味着 Writable 流知道很多关于另一个对象的信息。我必须传递一个引用:

var foo = new PasscodeGenerator('foobar');
foo
.pipe(new Hasher('md5'))
.pipe(new SampleConsumer(samples, foo));

这感觉不符合流的工作方式。我认为背压足以导致 Writable 阻止 Readable 推送数据并防止内存不足错误。

一个类似的例子是 Unix head 命令。在 Node 中实现它,我会假设目标可以结束,而不仅仅是忽略导致源继续推送数据,即使目标有足够的数据来满足文件的开头部分也是如此。

我如何以惯用的方式构建自定义流,以便在目标准备好结束时源流不会尝试推送更多数据?

最佳答案

这是一个 known issue _read() 是如何在内部被调用的。由于您的 _read() 始终同步/立即推送,因此内部流实现可以在适当的条件下进入循环。 _read() 实现是 generally expected进行某种异步 I/O(例如从磁盘或网络读取)。

解决此问题的方法(如上面的链接中所述)是使您的 _read() 异步至少某些时候。您也可以在每次调用时使其异步:

PasscodeGenerator.prototype._read = function(n) {
var passcode = '' + this.prefix + this.count;
var self = this;

// `setImmediate()` delays the push until the beginning
// of the next tick of the event loop
setImmediate(function() {
self.push({passcode: passcode});
});

this.count++;
};

关于node.js - 如何将背压应用于 Node 流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34226470/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com