- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用下面的 python 脚本从实时流音频输入中获取来自谷歌语音 API 的预测。
问题是,我需要从谷歌语音 API 对每个话语进行预测,然后还将每个话语的音频保存到磁盘。
我不确定如何修改脚本以保存每个话语的实时音频并打印每个话语的结果而不是连续预测。
#!/usr/bin/env python
import os
import re
import sys
import time
from google.cloud import speech
import pyaudio
from six.moves import queue
# Audio recording parameters
STREAMING_LIMIT = 240000 # 4 minutes
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms
api_key = r'path_to_json_file\google.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = api_key
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[0;33m'
def get_current_time():
"""Return Current Time in MS."""
return int(round(time.time() * 1000))
class ResumableMicrophoneStream:
"""Opens a recording stream as a generator yielding the audio chunks."""
def __init__(self, rate, chunk_size):
self._rate = rate
self.chunk_size = chunk_size
self._num_channels = 1
self._buff = queue.Queue()
self.closed = True
self.start_time = get_current_time()
self.restart_counter = 0
self.audio_input = []
self.last_audio_input = []
self.result_end_time = 0
self.is_final_end_time = 0
self.final_request_end_time = 0
self.bridging_offset = 0
self.last_transcript_was_final = False
self.new_stream = True
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
channels=self._num_channels,
rate=self._rate,
input=True,
frames_per_buffer=self.chunk_size,
# Run the audio stream asynchronously to fill the buffer object.
# This is necessary so that the input device's buffer doesn't
# overflow while the calling thread makes network requests, etc.
stream_callback=self._fill_buffer,
)
def __enter__(self):
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
# Signal the generator to terminate so that the client's
# streaming_recognize method will not block the process termination.
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, *args, **kwargs):
"""Continuously collect data from the audio stream, into the buffer."""
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
"""Stream Audio from microphone to API and to local buffer"""
while not self.closed:
data = []
if self.new_stream and self.last_audio_input:
chunk_time = STREAMING_LIMIT / len(self.last_audio_input)
if chunk_time != 0:
if self.bridging_offset < 0:
self.bridging_offset = 0
if self.bridging_offset > self.final_request_end_time:
self.bridging_offset = self.final_request_end_time
chunks_from_ms = round((self.final_request_end_time -
self.bridging_offset) / chunk_time)
self.bridging_offset = (round((
len(self.last_audio_input) - chunks_from_ms)
* chunk_time))
for i in range(chunks_from_ms, len(self.last_audio_input)):
data.append(self.last_audio_input[i])
self.new_stream = False
# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
chunk = self._buff.get()
self.audio_input.append(chunk)
if chunk is None:
return
data.append(chunk)
# Now consume whatever other data's still buffered.
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
self.audio_input.append(chunk)
except queue.Empty:
break
yield b''.join(data)
def listen_print_loop(responses, stream):
"""Iterates through server responses and prints them.
The responses passed is a generator that will block until a response
is provided by the server.
Each response may contain multiple results, and each result may contain
multiple alternatives; Here we
print only the transcription for the top alternative of the top result.
In this case, responses are provided for interim results as well. If the
response is an interim one, print a line feed at the end of it, to allow
the next result to overwrite it, until the response is a final one. For the
final one, print a newline to preserve the finalized transcription.
"""
for response in responses:
if get_current_time() - stream.start_time > STREAMING_LIMIT:
stream.start_time = get_current_time()
break
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
result_seconds = 0
result_nanos = 0
if result.result_end_time.seconds:
result_seconds = result.result_end_time.seconds
if result.result_end_time.nanos:
result_nanos = result.result_end_time.nanos
stream.result_end_time = int((result_seconds * 1000)
+ (result_nanos / 1000000))
corrected_time = (stream.result_end_time - stream.bridging_offset
+ (STREAMING_LIMIT * stream.restart_counter))
# Display interim results, but with a carriage return at the end of the
# line, so subsequent lines will overwrite them.
if result.is_final:
sys.stdout.write(GREEN)
sys.stdout.write('\033[K')
sys.stdout.write(str(corrected_time) + ': ' + transcript + '\n')
stream.is_final_end_time = stream.result_end_time
stream.last_transcript_was_final = True
# Exit recognition if any of the transcribed phrases could be
# one of our keywords.
if re.search(r'\b(exit|quit)\b', transcript, re.I):
sys.stdout.write(YELLOW)
sys.stdout.write('Exiting...\n')
stream.closed = True
break
else:
sys.stdout.write(RED)
sys.stdout.write('\033[K')
sys.stdout.write(str(corrected_time) + ': ' + transcript + '\r')
stream.last_transcript_was_final = False
def main():
"""start bidirectional streaming from microphone input to speech API"""
client = speech.SpeechClient()
config = speech.types.RecognitionConfig(
encoding=speech.enums.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=SAMPLE_RATE,
language_code='en-US',
max_alternatives=1)
streaming_config = speech.types.StreamingRecognitionConfig(
config=config,
interim_results=True)
mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
print(mic_manager.chunk_size)
sys.stdout.write(YELLOW)
sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n')
sys.stdout.write('End (ms) Transcript Results/Status\n')
sys.stdout.write('=====================================================\n')
with mic_manager as stream:
while not stream.closed:
sys.stdout.write(YELLOW)
sys.stdout.write('\n' + str(
STREAMING_LIMIT * stream.restart_counter) + ': NEW REQUEST\n')
stream.audio_input = []
audio_generator = stream.generator()
requests = (speech.types.StreamingRecognizeRequest(
audio_content=content)for content in audio_generator)
responses = client.streaming_recognize(streaming_config,
requests)
# Now, put the transcription responses to use.
listen_print_loop(responses, stream)
if stream.result_end_time > 0:
stream.final_request_end_time = stream.is_final_end_time
stream.result_end_time = 0
stream.last_audio_input = []
stream.last_audio_input = stream.audio_input
stream.audio_input = []
stream.restart_counter = stream.restart_counter + 1
if not stream.last_transcript_was_final:
sys.stdout.write('\n')
stream.new_stream = True
if __name__ == '__main__':
main()
最佳答案
我很难理解这段代码中发生的所有事情,而且我不想支付许可证来尝试它,但这里有一些想法。也许其他人会发现它们很有用并且可以进一步帮助您。
检测句子的结尾
首先,将句子与语音分开的一个大问题是,并非每个人都遵循相同的句子之间的停顿。有些人会等待更长时间,而另一些人会直接耕种下一个。有些人在句子中也会停顿。如果您使用相对简单的方法(例如尝试检测停顿)来检测音频数据中的句子结尾,则这会使得从音频数据中检测句子的结尾变得困难。
我能想象到的最好方法是使用您从 Google Speech API 返回的解释并在结束标点符号( !
、 ?
、 .
)上进行拆分。然后,您的问题将减少到将返回的响应与特定的音频数据块相关联。
看起来你可以通过 None
回到你的生成器,它已经优雅地结束了,所以应该不会太糟糕。当您决定一个句子结束时,您可能希望保存生成转录本的任何音频数据块。
这可能很难,因为当接收到更多音频时,Google Speech API 可能会追溯决定一个完整的句子实际上不是完整的,而是更长句子的一部分,因此您也需要注意这一点。
保存音频数据
至于保存原始音频数据,一旦您知道哪些块适用于哪些转录,只需将它们全部附加到列表中(例如 list_of_chunks
)并使用 wave
:
import wave
with wave.open("foo.wav", 'wb') as f:
f.setnchannels(self._num_channels)
f.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
f.setframerate(self._rate)
f.writeframes(b''.join(list_of_chunks))
您当然必须制作
num_channels
和
rate
如果您在
ResumableMicrophoneStream
之外执行此操作,则可以访问类(class)。
关于python - 如何从谷歌语音 api 获取每个话语的结果并将每个音频话语 block 单独保存为 wav 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63094643/
如果已知该确切样本存在于 wav 中的某处(但可能与其他声音混合),是否可以使用 FFT 找到较长 wav 中出现的小 wav 样本? 编辑 (收到两个回复后):如果我有一个包含所有已知声音的库,这些
我对 .NET 中的音频完全陌生,所以请多多包涵。 我的目标是创建一个具有两个 channel 的 wav 文件。左声道将包含语音消息(使用 SpeechSynthesizer 生成的流),右声道需要
我的大部分信息都来自其他stackoverflow帖子,但没有一个真正有用。 import UIKit import AVFoundation class FaceButtonSc
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 3 年前。
这可能是一个非常简单的问题;我将一个单声道 WAV 文件转换为一个 short[] 数组,并且我有一个将其写回 WAV 文件的函数。一切正常。 (writeBuffer 是 short[] 数组) b
我们的应用程序需要知道它加载的音频文件的样本数。我们使用的库可以可靠地确定采样率,但不能确定样本数。我们是否可以仅从文件大小和采样率来计算样本数? 最佳答案 马克说什么。不,通常您需要解释标题。但是,
我正在用java做一个项目,需要我加密wave文件。那么,是否有一个直接的过程可以将波形文件转换为二进制文件并返回?我将对二进制数据应用加密算法。 最佳答案 是的。 File file = new F
我想知道如何从 .wav 文件中获取样本以执行两个 .wav 文件的窗口连接。 谁能告诉我怎么做? 最佳答案 wave标准库的模块是关键:当然在代码顶部的 import wave 之后,wave.op
我有一个几分钟长的 .wav 文件,我想将其分成不同的 10 秒 .wav 文件。 到目前为止,这是我的 python 代码: import wave import math def main(fil
我在 ffmpeg 中使用以下命令合并多个 wav 文件: -f concat -safe 0 -i /storage/emulated/0/AudioClipsForSpeakerRecogniti
我正在尝试用python实现主动降噪。我的项目由两组代码组成: 录音代码 声音过滤代码 我的目标是当您运行该程序时,它将开始通过麦克风录音。录音完成后,会生成一个名为“file1.wav”的保存文件,
我正在尝试制作一个音乐识别系统。我担心我可能没有按照预期读取 wav 样本,而且我可能会应用错误的窗口大小来进行 FFT 和其他操作。 如果你能帮我的话,那就太好了。 首先,我有一些关于 Wavs 中
如何使用 java 合并两个 wav 文件? 我试过了 this但它没有正常工作,他们还有其他方法吗? 最佳答案 如果您直接处理 wav 文件的字节,您可以在任何编程语言中使用相同的策略。对于此示例,
尝试为我的 previous question 找到解决方法,我想将用 byte[](具有 wav header )编写的 16k 8 位单声道 wav 转换为 8k 8 位单声道流/字节 []。 是
目前我正在使用一个语音到文本的翻译模型,该模型采用 .wav 文件并将音频中的可听语音转换为文本转录本。该模型之前曾用于直接录制的 .wav 音频录音。但是现在我正在尝试对视频中最初出现的音频做同样的
试图在 python 中将 wav 文件转换为 wav uLaw。 使用 pydub 的 AudioSegment,我可以使用以下命令转换为 mp3: AudioSegment.from_wav(fr
我在 xcode 项目中添加了 LibFlac。然后我在我的项目中添加了来自 Libflac 的decode/main.c。我通过了 infile.flac 并运行了项目的可执行文件,但它给出了以下错
大家好,感谢您的阅读。 我想使用 Python 的 scipy.io.wavfile 对一首歌进行一些分析。由于我只有 .mp3 格式的歌曲,因此我使用 ffmpeg 将文件转换为 .wav,方法如下
我需要连接两个音频波,以便最终输出的音频波应该有一个更平滑的交汇点。我的意思是,在连接点,假设 10 秒钟,第一个音频应该开始淡出,而另一个音频开始拾取。 我已经能够连接两个音频文件并生成单个输出,但
我需要将一个 wav 文件转换为 8000Hz 16 位单声道 Wav。我已经有一个代码,它适用于 NAudio 库,但我想使用 MemoryStream 而不是临时文件。 using System.
我是一名优秀的程序员,十分优秀!