gpt4 book ai didi

node.js - 从内存消耗低的流中提取二进制值

转载 作者:搜寻专家 更新时间:2023-10-31 22:25:09 24 4
gpt4 key购买 nike

我正在使用 ExpressJS 构建一个 NodeJS 服务器,该服务器处理通过 POST 请求 从桌面应用程序发送的数据(50KB>100MB)待处理并退回。桌面应用程序 gzip 在发送之前压缩数据(50KB 变成 4KB)。

我希望服务器解压缩数据,从数据中提取值(字符串、整数、字符、数组、json 等),处理该数据,然后用处理后的数据进行响应。

我从这个开始:

apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
let outputData;
//extract values from req.body Buffer and do math on them.
//save processed data in outputData

res.json({
status: true,
data: outputData
});
});

这是有效的,因为 body-parser 将数据解压缩到存储在内存中的 Buffer req.body 中。这是我的主要问题......内存使用。我不想将整个数据集存储在内存中。


为了解决这个问题,我删除了 body-parser,而是将请求流直接通过管道传输到 zlib 转换流中:

apiRoute.route("/convert").post((req, res) =>{
req.pipe(zlib.createGunzip());
});

现在的问题是我不知道如何从流中提取二进制值。


这是我希望能够做到的:

apiRoute.route("/convert").post((req, res) =>{
let binaryStream = new stream.Transform();

req
.pipe(zlib.createGunzip())
.pipe(binaryStream);

let aValue = binaryStream.getBytes(20);//returns 20 bytes
let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
//etc...

});

但是我不知道有什么方法可以做到这一点。类似 Dissolve 的模块很接近,但是它们要求您提前设置解析逻辑,并且所有抓取的值都存储在内存中。

另外,我不知道如何在不将其全部加载到内存的情况下响应 outputData。


所以我的问题是,我该如何...

  • 以我自己的速率异步读取流中的数据并提取其中的值
  • 将处理后的数据发送回桌面应用程序,而不将其全部放入内存

最佳答案

我解决了我自己的问题。我不是 100% 确信这是实现此目标的最佳方法,因此我愿意接受建议。

我做了一个 stream.Transform 的子类并实现了 _transform 方法。我发现下一个数据 block 只有在 _transform 回调被调用时才会得到输入。知道这一点后,我将该回调函数存储为一个属性,并且仅在需要下一个 block 时调用它。

getBytes(size) 是一种方法,它将从当前 block (也保存为属性)中获取指定数量的字节,并在需要下一个 block 时调用之前保存的回调。这是递归完成的,以考虑不同大小的 block 和不同数量的请求字节。

然后结合使用 async/await 和 promises,我能够使整个过程保持异步 (afaik) 和背压。

const {Transform} = require('stream'),
events = require('events');

class ByteStream extends Transform{

constructor(options){
super(options);

this.event_emitter = new events.EventEmitter();
this.hasStarted = false;
this.hasEnded = false;
this.currentChunk;
this.nextCallback;
this.pos = 0;

this.on('finish', ()=>{
this.hasEnded = true;
this.event_emitter.emit('chunkGrabbed');
});
}

_transform(chunk, enc, callback){
this.pos = 0;
this.currentChunk = chunk;
this.nextCallback = callback;

if(!this.hasStarted){
this.hasStarted = true;
this.event_emitter.emit('started');
}
else{
this.event_emitter.emit('chunkGrabbed');
}
}

doNextCallback(){
return new Promise((resolve, reject) =>{
this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
this.nextCallback();
});
}

async getBytes(size){
if(this.pos + size > this.currentChunk.length)
{
let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);

if(!this.hasEnded)
{
var newSize = size-(this.currentChunk.length - this.pos);
//grab next chunk
await this.doNextCallback();
if(!this.hasEnded){
this.pos = 0;
let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
bytes = Buffer.concat([bytes, recurseBytes]);
}
}

return bytes;
}
else{
let bytes = this.currentChunk.slice(this.pos, this.pos+size);
this.pos += size;
return bytes;
}
}
}

module.exports = {
ByteStream : ByteStream
}

我现在的 express 路线是:

apiRoute.route("/convert").post((req, res)=>{

let bStream = new ByteStream({});
let gStream = zlib.createGunzip();

bStream event_emitter.on('started', async () => {
console.log("started!");

let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
console.log(myValue.length);
});

req
.pipe(gStream)
.pipe(bStream);
});

通过检查事件 started,我可以知道第一个 block 何时流式传输到 bStream。从那里开始,只需使用我想要的字节数调用 getBytes(),然后将 promise 的值分配给变量即可。它可以满足我的需要,尽管我还没有进行任何严格的测试。

关于node.js - 从内存消耗低的流中提取二进制值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55365136/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com