- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
- 期望的行为
- 实际行为
- 我试过的方法
- 复制步骤
- 研究
期望的行为
将从多个 api 请求接收到的多个可读流通过管道传输到单个可写流。
api 响应来自 ibm-watson 的 textToSpeech.synthesize()方法。
需要多个请求的原因是服务对文本输入有 5KB
限制。
因此,例如 18KB
的字符串需要四个请求才能完成。
实际行为
可写流文件不完整,乱码。
应用程序似乎“挂起”。
当我尝试在音频播放器中打开不完整的 .mp3
文件时,它说它已损坏。
打开和关闭文件的过程似乎会增加文件的大小——就像打开文件会以某种方式提示更多数据流入一样。
如果输入越大,不良行为就越明显,例如 4000 字节或更少的四个字符串。
我的尝试
我已经尝试了几种方法来使用 npm 包 combined-stream 将可读流通过管道传输到单个可写流或多个可写流, combined-stream2 , multistream和 archiver它们都会导致文件不完整。我的最后一次尝试没有使用任何包,并显示在下面的 Steps To Reproduce
部分中。
因此,我质疑我的应用程序逻辑的每个部分:
01. What is the response type of a watson text to speech api request?
text to speech docs ,说api响应类型是:
Response type: NodeJS.ReadableStream|FileObject|Buffer
我很困惑响应类型是三种可能的事情之一。
在我所有的尝试中,我一直假设它是一个可读流
。
02. Can I make multiple api requests in a map function?
03. Can I wrap each request within a
promise()
and resolve theresponse
?04. Can I assign the resulting array to a
promises
variable?05. Can I declare
var audio_files = await Promise.all(promises)
?06. After this declaration, are all responses 'finished'?
07. How do I correctly pipe each response to a writable stream?
08. How do I detect when all pipes have finished, so I can send file back to client?
对于问题 2 - 6,我假设答案是"is"。
我认为我的失败与问题 7 和 8 有关。
复制步骤
您可以使用由四个随机生成的文本字符串组成的数组来测试此代码,这些字符串的字节大小分别为 3975
、3863
、3974
和3629
字节 - here is a pastebin of that array .
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
据我所知,这似乎适用于单个请求,但不适用于多个请求。
研究
关于可读和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain' 和 'finish' 事件、pipe()、fs.createReadStream() 和 fs。 createWriteStream()
Almost all Node.js applications, no matter how simple, use streams in some manner...
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
Readable streams have two main modes that affect the way we can consume them...they can be either in the
paused
mode or in theflowing
mode. All readable streams start in the paused mode by default but they can be easily switched toflowing
and back topaused
when needed...just adding adata
event handler switches a paused stream intoflowing
mode and removing thedata
event handler switches the stream back topaused
mode.
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
Here’s a list of the important events and functions that can be used with readable and writable streams
The most important events on a readable stream are:
The
data
event, which is emitted whenever the stream passes a chunk of data to the consumer Theend
event, which is emitted when there is no more data to be consumed from the stream.The most important events on a writable stream are:
The
drain
event, which is a signal that the writable stream can receive more data. Thefinish
event, which is emitted when all data has been flushed to the underlying system.
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()
takes care of listening for 'data' and 'end' events from thefs.createReadStream()
.
https://github.com/substack/stream-handbook#why-you-should-use-streams
.pipe()
is just a function that takes a readable source stream src and hooks the output to a destination writable streamdst
https://github.com/substack/stream-handbook#pipe
The return value of the
pipe()
method is the destination stream
https://flaviocopes.com/nodejs-streams/#pipe
By default, stream.end() is called on the destination
Writable
stream when the sourceReadable
stream emits'end'
, so that the destination is no longer writable. To disable this default behavior, theend
option can be passed asfalse
, causing the destination stream to remain open:
https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
The
'finish'
event is emitted after thestream.end()
method has been called, and all data has been flushed to the underlying system.
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
https://nodejs.org/api/stream.html#stream_event_finish
If you're trying to read multiple files and pipe them to a writable stream, you have to pipe each one to the writable stream and and pass
end: false
when doing it, because by default, a readable stream ends the writable stream when there's no more data to be read. Here's an example:
var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
https://stackoverflow.com/a/30916248
You want to add the second read into an eventlistener for the first read to finish...
var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}
https://stackoverflow.com/a/28033554
相关的谷歌搜索:
how to pipe multiple readable streams to a single writable stream? nodejs
涉及相同或相似主题的问题,没有权威答案(或可能“过时”):
How to pipe multiple ReadableStreams to a single WriteStream?
Piping to same Writable stream twice via different Readable stream
最佳答案
这里要解决的核心问题是异步性。您几乎拥有它:您发布的代码的问题是您将所有源流并行且无序地输送到目标流中。这意味着 data
block 将从不同的音频流中随机流出 - 即使您的 end
事件也会在没有 end
的情况下超越 pipe
> 过早关闭目标流,这可以解释为什么重新打开后它会增加。
您想要的是按顺序传递它们 - 您甚至在引用时发布了解决方案
You want to add the second read into an eventlistener for the first read to finish...
或作为代码:
a.pipe(c, { end:false });
a.on('end', function() {
b.pipe(c);
}
这会将源流按顺序通过管道传输到目标流中。
使用您的代码,这意味着将 audio_files.forEach
循环替换为:
await Bluebird.mapSeries(audio_files, async (audio, index) => {
const isLastIndex = index == audio_files_length - 1;
audio.pipe(write_stream, { end: isLastIndex });
return new Promise(resolve => audio.on('end', resolve));
});
注意 bluebird.js mapSeries 的用法在这里。
关于您的代码的进一步建议:
const
& let
而不是 var
并考虑使用 camelCase
进一步阅读,组合原生 Node 流的限制:https://github.com/nodejs/node/issues/93
关于node.js - 如何将多个可读流(从多个 api 请求)传输到单个可写流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57157632/
我在不同的硬件上测试 Cassandra 已经有一段时间了。 首先我有 2 个 CPU 和 6 GB RAM 然后我更改为 16 个 CPU 和 16 GB RAM(其中只有 6 GB 可供我的测试使
我只是想从二进制文件中读/写。我一直在关注 this教程,它的工作原理......除了它似乎正在将内容写入 txt 文件。我在测试的时候把文件命名为test.bin,但是记事本可以打开并正常显示,所以
我编写了一些简单的 Java 代码来从文本文件中读取字符串,将它们组合起来,然后将它们写回。 (有关输出没有变化的简化版本,请参见下面的片段) 问题是输入文件和输出文件中的特定字符(- 和 ...)是
我真的很感兴趣——你为什么要放 readln; 从键盘读取一些值到变量后的行?例如, repeat writeln('Make your choise'); read(CH); if (CH = '1
只要程序不允许同时写入存储在模块中的共享数据结构的相同元素,它是线程安全的吗?我知道这是一个菜鸟问题,但在任何地方都找不到明确解决的问题。情况如下: 在程序开始时,数据被初始化并存储在模块级可分配数组
我有一个数据结构,其操作可以归类为读取操作(例如查找)和写入操作(例如插入、删除)。这些操作应该同步,以便: 读操作不能在写操作执行时执行(除非在同一线程上),但是读操作可以与其他读操作并发执行。 在
我在Java套接字编程中有几个问题。 在读取客户端套接字中的输入流时,如果抛出IO异常;那么我们是否需要重新连接服务器套接字/再次初始化客户端套接字? 如果我们关闭输出流,它将关闭客户端套接字吗? 如
我正在尝试从客户端将结构写入带有套接字的服务器。 结构是: typedef struct R { int a; int b; double c; double d; double result[4];
我想知道是否可以通过 Javascript 从/向 Azure Active Directory 广告读取/写入数据。我读到 Azure 上有 REST 服务,但主要问题是生成与之通信的 token
我希望有人能提供完整的工作代码,允许在 Haskell 中执行以下操作: Read a very large sequence (more than 1 billion elements) of 32
我有一个任务是制作考试模拟器。我的意思是,在老师输入某些科目的分数后,学生输入他的名字、姓氏和出生,然后他决定学生是否通过科目。所以,我有一个问题,如何用新行写入文件文本并通过重写该文件来读取(逐行读
我需要编写巨大的文件(超过 100 万行)并将文件发送到另一台机器,我需要使用 Java BufferedReader 一次读取一行。 我使用的是 indetned Json 格式,但结果不太方便,
我在 Android 应用程序中有一个读写操作。在 onCreate 上,将读取文件并将其显示为编辑文本并且可以进行编辑。当按下保存按钮时,数据将被写入 onCreate 上读取的同一文件中。但我得到
我正在编写一个程序,该程序从一个文件读取输入,然后该程序将格式化数据并将其写入另一个文件。 输入文件: Christopher kardaras,10 N Brainard,Naperville,IL
我有一个 SCALA(+ JAVA) 代码,它以一定的速率读写。分析可以告诉我代码中每个方法的执行时间。如何衡量我的程序是否达到了最大效率?为了使我的代码优化,以便它以给定配置可能的最大速度读取。我知
嗨,我想知道如何访问 java/maven 中项目文件夹中的文件,我考虑过使用 src/main/resources,但有人告诉我,写入此目录中的文件是一个坏主意,并且应该只在项目的配置中使用,所以我
我想读\写一个具有以下结构的二进制文件: 该文件由“RECORDS”组成。每个“RECORD”具有以下结构:我将以第一条记录为例 (红色)起始字节:0x5A(始终为 1 字节,固定值 0x5A) (绿
我想制作一个C程序,它将用一些参数来调用;每个参数将代表一个文件名,我想在每个参数中写一些东西。 FILE * h0; h0 = fopen(argv[0],"w"); char buff
我有一个包含团队详细信息的文件。我需要代码来读取文件,并将获胜百分比写入第二个文件。我还需要使用指示的搜索功能来搜索团队的具体信息。该代码未写入百分比文件。当菜单显示时,第一个文件的内容被打印,但代码
我正在使用 read() 和 write() 函数来处理我的类,并且我正在尝试使用一个函数来写入它所读取的内容以及我作为参数给出的前面的内容。 例如,我想给出 10 作为我的程序的参数 int mai
我是一名优秀的程序员,十分优秀!