gpt4 book ai didi

使用多处理运行 ffmpeg 的 Python

转载 作者:行者123 更新时间:2023-12-04 22:54:23 27 4
gpt4 key购买 nike

我正在尝试运行 ffmpeg带有 multiprocessing 的命令在并行任务中。
我的 python ffmpeg要并行化的调用如下:

def load_audio(args, kwargs):
url = args

start = kwargs["start"]
end = kwargs["end"]
sr = kwargs["sr"]
n_channels = kwargs["n_channels"]
mono = kwargs["mono"]

cmd = ["ffmpeg", "-i", url, "-acodec", "pcm_s16le", "-ac", str(n_channels), "-ar", str(sr), "-ss", _to_ffmpeg_time(start), "-t", _to_ffmpeg_time(end - start), "-sn", "-vn", "-y", "-f", "wav", "pipe:1"]

process = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 ** 8
)
buffer = process.stdout
waveform = np.frombuffer(buffer=process.stdout, dtype=np.uint16, offset=8 * 44)
waveform = waveform.astype(dtype)
return waveform
然后我通过偏移 60 seconds 读取音频:
_THREAD_POOL = BoundedThreadPoolExecutor(max_workers=NTHREADS)
tasks = []
cur_start = start
for i in range(NTHREADS):
msg = {'start':cur_start,
'end':min(cur_start+60,end)}
t = execute_callback(load_audio,
'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3',
msg)
cur_start+=60
tasks.append(t)
在哪里 execute_callback将线程提交到池中:
def execute_callback(fn, args, kwargs):
try:
futures_thread = _THREAD_POOL.submit(fn, args, kwargs)
return futures_thread
except Exception as e:
return None
我终于检索到结果,并连接到 numpy数组(将转到 soundfile 进行读取)
futures_results = get_results_as_completed(tasks, return_when=ALL_COMPLETED)
waveform = []
for i,r in enumerate(futures_results):
if not i:
waveform = r
print(type(r))
else:
waveform = np.append(waveform,r)
在哪里 get_results_as_completed
def get_results_as_completed(futures, return_when=ALL_COMPLETED):
finished = as_completed(futures)
for f in finished:
try:
yield f.result()
except Exception as e:
pass
我在哪里使用有界池执行器类 herehere .
我正在使用 as_completed检索已完成状态的 future ,这会导致输出不保留在输入顺序中,而是“完成”顺序中,这会导致音频输出错误。我的问题是
  • ffmpeg future 实际上是并行执行的?在我的测试中下载整个音频,例如:
         args = 'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3'
    kwargs = {'start':start,
    'end':end}
    waveform = load_audio(args,kwargs)
  • 是否可以保留结果的输入顺序 没有 使用信号量,但仅限 multiprocessing函数(map 可能是?)。如果是这样,怎么做?
  • 最佳答案

    我想如果你使用 BoundedThreadPoolExecutor您在技术上是多线程的,然后每个线程都在运行一个进程(稍后会详细介绍)。
    无论如何,您的功能 execute_callback ,它的名字让我有点困惑,实际上向池提交了一个任务并返回一个 Future然后附加到列表 tasks 的实例.然后你通过 tasksget_results_as_completed ,它会按完成顺序从您的任务中产生返回值。但这不是你想要的。
    因此,首先回答您的第二个问题:如果您不希望结果按完成顺序,请不要使用函数 as_completed .而是调用:

    def get_results(futures):
    for f in futures:
    try:
    yield f.result()
    except Exception as e:
    pass
    回答您的第一个问题:对于 CPU 密集型 Python 代码,多线程通常不是您想要的,因为全局解释器锁存在争用。但是由于每个线程都在启动一个进程并等待进程结束,所以我认为您应该实现并行化,并且我认为没有理由因为我之前的回答而不应维护输入顺序。
    现在我的问题和评论:
  • 为什么BoundedThreadPoolExecutor而不是仅仅使用标准 ThreadPoolExecutor ?
  • 在具有 args 的参数签名的函数定义中和 kwargs作为论据,例如load_audio ,将这些参数指定为 *args 会更“正常”和 **kwargs然后你会这样调用这个函数:load_audio('https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', start=start, end=end) .
  • 关于使用多处理运行 ffmpeg 的 Python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68670906/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com