gpt4 book ai didi

javascript - Node 流: Wait until data is available

转载 作者:太空宇宙 更新时间:2023-11-04 02:08:48 25 4
gpt4 key购买 nike

我有 3 个流(A、B、C),它们通过管道输送到另一个流(A->B->C)。当我启动程序时,B 的 _read get 会立即被调用,因为它通过管道传输到 C。但是,当 A 异步获取数据时,B 流中还没有数据。一旦 B 接收到传递给 B 的 _write 方法的数据,它就会转换数据并发出一个“可读”事件(我手动触发该事件 - 这是假定的执行方式吗?)。

但是什么也没有发生,并且来自 B 的数据没有被任何人消耗(因此 B 的 _read 没有被调用)。我可以通过在 _write() 方法末尾调用(在 B 上) this._read() 来解决此问题。但是,尽管队列已满,这也可能将数据推送给消费者,对吗?

基本上我想将较大的数据 block 发送到B流中,将其分割成较小的数据,然后将它们一一传递给C。所以我想在 B 中有某种缓冲区。

_read(size) {
if(this._lineBuffer.length > 0) {
var stop = false;
while(!stop) {
stop = this.push(this._lineBuffer.splice(0,1));
}
}
if(this._pendingWriteAck) {
this._pendingWriteAck();
this._pendingWriteAck = null;
}
}

_write(chunk, encoding, callback) {
console.log("New chunk for line splitter received");
if(this._buffer) {
this._buffer = Buffer.concat([this._buffer, chunk]);
} else {
this._buffer = chunk;
}
for (; this._offset < this._buffer.length; this._offset++) {
if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
this._buffer = this._buffer.slice(this._offset + 1);
this._offset = 0;
}
}

if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
console.log("Line Split buffer has reached capacity. Waiting...");
this._pendingWriteAck = callback;
} else {
callback();
}

setImmediate(()=>{
this.emit('readable');
this._read();
})
}

最佳答案

您可以将转换流用于“B”流:

const Transform = require('stream').Transform;

const B = new Transform({
transform(chunk, encoding, callback) {
this._offset = this._offset || 0;
this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk
for (; this._offset < this._buffer.length; this._offset++) {
if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
if (this._offset) {
this.push(this._buffer.slice(0, this._offset), encoding);
}
this._buffer = this._buffer.slice(this._offset + 1);
this._offset = 0;
}
}
callback()
},
flush(callback) {
if (this._buffer && this._buffer.length) {
this.push(this._buffer);
}
callback();
}
});

您可以通过执行以下操作来查看它的工作情况:

let line = 0
B.on('data', (chunk) => process.stdout.write(`${++line}. ${chunk}\n`))
B.write(['Foo', 'Bar', 'Baz', 'Qux', 'Hello '].join('\n'))
B.write(['World', 'The End'].join('\n'))
B.end()

终端的输出将是:

1. Foo
2. Bar
3. Baz
4. Qux
5. Hello World
6. The End

关于javascript - Node 流: Wait until data is available,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43192126/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com