gpt4 book ai didi

node.js - 如何在 node.js 中实现正确处理背压的流?

转载 作者:搜寻专家 更新时间:2023-10-31 23:48:04 26 4
gpt4 key购买 nike

我一辈子都想不出如何实现 stream正确处理背压。你不应该使用暂停和恢复吗?

我有这个实现,我正在尝试使其正常工作:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.stream = myStream

myStream.on('readable', function() {
var data = myStream.read(5000)
//process.stdout.write("Eff: "+data)
if(data !== null) {
if(!this.push(data)) {
process.stdout.write("Pause")
this.pause()
}
callback(data)
}
}.bind(this))

myStream.on('end', function() {
this.push(null)
}.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
process.stdout.write("resume")
//this.resume() // putting this in for some reason causes the stream to not output???
}

它正确地发送了输出,但没有正确地产生背压。如何更改它以正确支持背压?

最佳答案

好吧,经过大量的反复试验,我终于弄明白了。一些准则:

  • 永远不要使用暂停或恢复(否则它会进入传统的“流动”模式)
  • 永远不要添加“数据”事件监听器(否则它将进入传统的“流动”模式)
  • 实现者有责任跟踪源代码何时可读
  • 实现者有责任跟踪目的地何时需要更多数据
  • 在调用 _read 方法之前,实现不应读取任何数据
  • read 的参数告诉源给它那么多字节,最好将传递给 this._read 的参数传递给源的 read 方法。通过这种方式,您应该能够配置在目的地一次读取多少,并且流链的其余部分应该是自动的。

所以我把它改成了:

更新:我创建了一个 Readable,它在适当的背压下更容易实现,并且应该具有与 Node 的 native 流一样多的灵 active 。

var Readable = stream.Readable
var util = require('util')

// an easier Readable stream interface to implement
// requires that subclasses:
// implement a _readSource function that
// * gets the same parameter as Readable._read (size)
// * should return either data to write, or null if the source doesn't have more data yet
// call 'sourceHasData(hasData)' when the source starts or stops having data available
// calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
stream.Readable.apply(this, arguments)
if(this._readSource === undefined) {
throw new Error("You must define a _readSource function for an object implementing Stream666")
}

this._sourceHasData = false
this._destinationWantsData = false
this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
this._destinationWantsData = true
if(this._sourceHasData) {
pushSourceData(this, size)
} else {
this._size = size
}
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
this._sourceHasData = _sourceHasData
if(_sourceHasData && this._destinationWantsData) {
pushSourceData(this, this._size)
}
}
Stream666.Readable.prototype.end = function() {
this.push(null)
}
function pushSourceData(stream666Readable, size) {
var data = stream666Readable._readSource(size)
if(data !== null) {
if(!stream666Readable.push(data)) {
stream666Readable._destinationWantsData = false
}
} else {
stream666Readable._sourceHasData = false
}
}

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
// stream - the stream to peek at
// callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
Stream666.Readable.call(this)
this.stream = myStream
this.callback = callback

myStream.on('readable', function() {
this.sourceHasData(true)
}.bind(this))
myStream.on('end', function() {
this.end()
}.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
var data = this.stream.read(size)
if(data !== null) {
this.callback(data)
return data
} else {
this.sourceHasData(false)
return null
}
}

旧答案:

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
// stream - the stream to peek at
// callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
stream.Readable.call(this)
this.stream = myStream
this.callback = callback
this.reading = false
this.sourceIsReadable = false

myStream.on('readable', function() {
this.sourceIsReadable = true
this._readMoreData()
}.bind(this))

myStream.on('end', function() {
this.push(null)
}.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
this.reading = true
if(this.sourceIsReadable) {
this._readMoreData()
}
}
StreamPeeker.prototype._readMoreData = function() {
if(!this.reading) return;

var data = this.stream.read()
if(data !== null) {
if(!this.push(data)) {
this.reading = false
}
this.callback(data)
}
}

关于node.js - 如何在 node.js 中实现正确处理背压的流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29308878/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com