gpt4 book ai didi

node.js - 如何将多个可读流(从多个 api 请求)传输到单个可写流?

转载 作者:IT老高 更新时间:2023-10-28 23:21:34 24 4
gpt4 key购买 nike

- 期望的行为
- 实际行为
- 我试过的方法
- 复制步骤
- 研究


期望的行为

将从多个 api 请求接收到的多个可读流通过管道传输到单个可写流。

api 响应来自 ibm-watson 的 textToSpeech.synthesize()方法。

需要多个请求的原因是服务对文本输入有 5KB 限制。

因此,例如 18KB 的字符串需要四个请求才能完成。

实际行为

可写流文件不完整,乱码。

应用程序似乎“挂起”。

当我尝试在音频播放器中打开不完整的 .mp3 文件时,它说它已损坏。

打开和关闭文件的过程似乎会增加文件的大小——就像打开文件会以某种方式提示更多数据流入一样。

如果输入越大,不良行为就越明显,例如 4000 字节或更少的四个字符串。

我的尝试

我已经尝试了几种方法来使用 npm 包 combined-stream 将可读流通过管道传输到单个可写流或多个可写流, combined-stream2 , multistreamarchiver它们都会导致文件不完整。我的最后一次尝试没有使用任何包,并显示在下面的 Steps To Reproduce 部分中。

因此,我质疑我的应用程序逻辑的每个部分:

01. What is the response type of a watson text to speech api request?

text to speech docs ,说api响应类型是:

Response type: NodeJS.ReadableStream|FileObject|Buffer

我很困惑响应类型是三种可能的事情之一。

在我所有的尝试中,我一直假设它是一个可读流

02. Can I make multiple api requests in a map function?

03. Can I wrap each request within a promise() and resolve the response?

04. Can I assign the resulting array to a promises variable?

05. Can I declare var audio_files = await Promise.all(promises)?

06. After this declaration, are all responses 'finished'?

07. How do I correctly pipe each response to a writable stream?

08. How do I detect when all pipes have finished, so I can send file back to client?

对于问题 2 - 6,我假设答案是"is"。

我认为我的失败与问题 7 和 8 有关。

复制步骤

您可以使用由四个随机生成的文本字符串组成的数组来测试此代码,这些字符串的字节大小分别为 3975386339743629 字节 - here is a pastebin of that array .

// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);

// route handler middleware
const api_tts_get = async (req, res) => {

var query_parameters = req.query;

var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV

var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root

// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {

return new Promise((resolve, reject) => {

// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});

// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};

// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});

});
});

try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;

var write_stream = fs.createWriteStream(`${relative_path}.mp3`);

audio_files.forEach((audio, index) => {

// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}

});

write_stream.on('finish', function() {

// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});

});


} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}

}

official example显示:

textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});

据我所知,这似乎适用于单个请求,但不适用于多个请求。

研究

关于可读和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain' 和 'finish' 事件、pipe()、fs.createReadStream() 和 fs。 createWriteStream()


Almost all Node.js applications, no matter how simple, use streams in some manner...

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream

let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});

// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});

https://nodejs.org/api/stream.html#stream_api_for_stream_consumers


Readable streams have two main modes that affect the way we can consume them...they can be either in the paused mode or in the flowing mode. All readable streams start in the paused mode by default but they can be easily switched to flowing and back to paused when needed...just adding a data event handler switches a paused stream into flowing mode and removing the data event handler switches the stream back to paused mode.

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


Here’s a list of the important events and functions that can be used with readable and writable streams

enter image description here

The most important events on a readable stream are:

The data event, which is emitted whenever the stream passes a chunk of data to the consumer The end event, which is emitted when there is no more data to be consumed from the stream.

The most important events on a writable stream are:

The drain event, which is a signal that the writable stream can receive more data. The finish event, which is emitted when all data has been flushed to the underlying system.

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


.pipe() takes care of listening for 'data' and 'end' events from the fs.createReadStream().

https://github.com/substack/stream-handbook#why-you-should-use-streams


.pipe() is just a function that takes a readable source stream src and hooks the output to a destination writable stream dst

https://github.com/substack/stream-handbook#pipe


The return value of the pipe() method is the destination stream

https://flaviocopes.com/nodejs-streams/#pipe


By default, stream.end() is called on the destination Writable stream when the source Readable stream emits 'end', so that the destination is no longer writable. To disable this default behavior, the end option can be passed as false, causing the destination stream to remain open:

https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options


The 'finish' event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});

https://nodejs.org/api/stream.html#stream_event_finish


If you're trying to read multiple files and pipe them to a writable stream, you have to pipe each one to the writable stream and and pass end: false when doing it, because by default, a readable stream ends the writable stream when there's no more data to be read. Here's an example:

var ws = fs.createWriteStream('output.pdf');

fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);

https://stackoverflow.com/a/30916248


You want to add the second read into an eventlistener for the first read to finish...

var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}

https://stackoverflow.com/a/28033554


Node 流的简史 - 部分 onetwo .


相关的谷歌搜索:

how to pipe multiple readable streams to a single writable stream? nodejs

涉及相同或相似主题的问题,没有权威答案(或可能“过时”):

How to pipe multiple ReadableStreams to a single WriteStream?

Piping to same Writable stream twice via different Readable stream

Pipe multiple files to one response

Creating a Node.js stream from two piped streams

最佳答案

这里要解决的核心问题是异步性。您几乎拥有它:您发布的代码的问题是您将所有源流并行且无序地输送到目标流中。这意味着 data block 将从不同的音频流中随机流出 - 即使您的 end 事件也会在没有 end 的情况下超越 pipe > 过早关闭目标流,这可以解释为什么重新打开后它会增加。

您想要的是按顺序传递它们 - 您甚至在引用时发布了解决方案

You want to add the second read into an eventlistener for the first read to finish...

或作为代码:

a.pipe(c, { end:false });
a.on('end', function() {
b.pipe(c);
}

这会将源流按顺序通过管道传输到目标流中。

使用您的代码,这意味着将 audio_files.forEach 循环替换为:

await Bluebird.mapSeries(audio_files, async (audio, index) => {  
const isLastIndex = index == audio_files_length - 1;
audio.pipe(write_stream, { end: isLastIndex });
return new Promise(resolve => audio.on('end', resolve));
});

注意 bluebird.js mapSeries 的用法在这里。

关于您的代码的进一步建议:

  • 您应该考虑使用 lodash.js
  • 您应该使用 const & let 而不是 var 并考虑使用 camelCase
  • 当您注意到“它对一个事件有效,但对多个事件失败”时,请始终思考:异步性、排列、竞争条件。

进一步阅读,组合原生 Node 流的限制:https://github.com/nodejs/node/issues/93

关于node.js - 如何将多个可读流(从多个 api 请求)传输到单个可写流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57157632/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com