我正在运行一个简单的应用程序,它通过流传输对象,如下所示:
new ReadStreamThatCreatesData()
.pipe(new TransformerStream())
.pipe(new WriteStreamThatActsOnData()
但我希望 WriteStreamThatActsOnData
能够访问 ReadStreamThatCreatesData
中的属性,而 TransformerStream
不必知道它或能够访问它。我想要的伪代码基本上是这样的:
new ReadStreamThatCreatesData()
.storeContext((obj) => obj.property)
.pipe(new TransformerStream())
.retrieveContext((obj, context) => obj.property = context)
.pipe(new WriteStreamThatActsOnData()
但考虑到流的性质,我真的不明白这是怎么可能的。有谁对我如何做这样的事情有任何聪明的想法吗?
我想到的一种方法是将 ReadStreamThatCreatesData 通过管道传输到一个函数,该函数将其分成两个不同的流:一个流是您从项目中提取的上下文属性,另一个流包含对象的其余部分。您可以通过管道将第二个流传输到 TransformerStream,然后通过 zip 运算符将输出与上下文流一起传输,将两个流合并回一个流。然后将其发送到 WriteStreamThatActsOnData。
我认为没有内置的 node.js 函数可以执行此操作,但您可以使用另一个库,例如 RxJS或highland .
这是一个使用高地实现的示例:
'use strict';
/*
Put this content into input.txt. Make sure there are no blank lines in the file:
{ "secret": 1, "val": "a" }
{ "secret": 2, "val": "b" }
{ "secret": 3, "val": "c" }
{ "secret": 4, "val": "d" }
{ "secret": 5, "val": "e" }
After running, output.txt should have this content:
{"val":"A","secret":1}
{"val":"B","secret":2}
{"val":"C","secret":3}
{"val":"D","secret":4}
{"val":"E","secret":5}
*/
const fs = require('fs');
const stream = require('stream');
const highland = require('highland');
const input = fs.createReadStream('input.txt');
const output = fs.createWriteStream('output.txt');
function readStreamThatCreatesData() {
return highland(input).split().map(JSON.parse);
}
class TransformerStream extends stream.Transform {
constructor(options) {
if (!options) {
options = {};
}
options.objectMode = true;
super(options);
}
_transform(item, enc, cb) {
if (item.secret) {
item.secret = 'removed';
}
item.val = item.val.toUpperCase();
this.push(item);
cb();
}
};
function removeSecret(item) {
delete item.secret;
return item;
}
function extractSecret(item) {
return item.secret;
}
const inputStream = readStreamThatCreatesData();
const secretStream = inputStream.fork().map(extractSecret);
const mainStream = inputStream.fork().map(removeSecret);
secretStream.zip(highland(mainStream.pipe(new TransformerStream())))
.map((combined) => {
const secret = combined[0];
const item = combined[1];
item.secret = secret;
return JSON.stringify(item) + '\n';
})
.pipe(output);
我是一名优秀的程序员,十分优秀!