gpt4 book ai didi

Python 3.4 多处理队列比 Pipe 快,出乎意料

转载 作者:IT老高 更新时间:2023-10-28 20:48:49 25 4
gpt4 key购买 nike

我正在做一个从 udp 套接字接收样本的音频播放器,一切正常。但是当我实现了一个 Lost Concealment 算法时,播放器未能以异常(exception)的速率保持沉默(每 10 毫秒发送一个包含多个 160 字节的列表)。

当使用 pyaudio 播放音频时,使用阻塞调用 write 播放一些样本,我注意到它在样本持续时间内平均阻塞。所以我创建了一个新的专用流程来播放样本。

主进程处理音频的输出流,并使用 multiprocessing.Pipe 将结果发送到该进程。我决定使用 multiprocessing.Pipe 因为它应该比其他方式更快。

不幸的是,当我在虚拟机上运行程序时,比特率是我在快速 PC 上获得的一半,这并没有达到目标比特率。

经过一些测试,我得出结论,导致延迟的原因是 Pipe 的函数 send

我做了一个简单的基准测试脚本(见下文)来查看传输到进程的各种方法之间的差异。该脚本不断发送 [b'\x00'*160] 5 秒钟,并计算总共发送了多少字节字节对象。我测试了以下发送方法:“不发送”、multiprocessing.Pipe、multiprocessing.Queue、multiprocessing.Manager、multiprocessing.Listener/Client,最后是socket.socket:

我的“快速”PC 运行窗口 7 x64 的结果:

test_empty     :     1516076640
test_pipe : 58155840
test_queue : 233946880
test_manager : 2853440
test_socket : 55696160
test_named_pipe: 58363040

VirtualBox 的 VM guest 运行 Windows 7 x64 的结果,主机运行 Windows 7 x64:

test_empty     :     1462706080
test_pipe : 32444160
test_queue : 204845600
test_manager : 882560
test_socket : 20549280
test_named_pipe: 35387840

使用的脚本:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]

sent += len(data)
if time.time()-s >= 5:
break
print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
while True:
msg = pipe_in.recv()
if msg == []:
break


def test_pipe():
pipe_out, pipe_in = Pipe()
p = Process(target=pipe_void, args=(pipe_in,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
pipe_out.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
pipe_out.send([])
p.join()
print(FS.format("test_pipe", sent))


def queue_void(q):
while True:
msg = q.get()
if msg == []:
break


def test_queue():
q = Queue()
p = Process(target=queue_void, args=(q,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
q.put(lst)
sent += len(data)
if time.time()-s >= 5:
break
q.put([])
p.join()

print(FS.format("test_queue", sent))


def manager_void(l, lock):
msg = None
while True:
with lock:
if len(l) > 0:
msg = l.pop(0)
if msg == []:
break


def test_manager():
with Manager() as manager:
l = manager.list()
lock = manager.Lock()
p = Process(target=manager_void, args=(l, lock))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
with lock:
l.append(lst)
sent += len(data)
if time.time()-s >= 5:
break
with lock:
l.append([])
p.join()

print(FS.format("test_manager", sent))


def socket_void():
addr = ('127.0.0.1', 20000)
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break


def test_socket():
addr = ('127.0.0.1', 20000)
listener = Listener(addr, "AF_INET")
p = Process(target=socket_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()

print(FS.format("test_socket", sent))


def named_pipe_void():
addr = '\\\\.\\pipe\\Test'
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break


def test_named_pipe():
addr = '\\\\.\\pipe\\Test'
listener = Listener(addr, "AF_PIPE")
p = Process(target=named_pipe_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()

print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
test_empty()
test_pipe()
test_queue()
test_manager()
test_socket()
test_named_pipe()

问题

  • 如果 Queue 使用 Pipe 在这种情况下它比 Pipe 快吗?这与问题 Python multiprocessing - Pipe vs Queue 相矛盾
  • 如何保证从一个进程到另一个进程的恒定比特率流,同时具有低发送延迟?

更新 1

在我的程序中,在尝试使用队列而不是管道之后。 我得到了巨大的提升

在我的计算机上,使用 Pipes 我得到 +- 16000 B/s,使用 Queues 我得到 +-750 万 B/s。在虚拟机上,我的速度从 +-13000 B/s 提高到 650 万 B/s。使用 Queue instread 的 Pipe 字节数增加了大约 500 倍。

当然我不会每秒播放数百万字节,我只会播放正常的声音速率。 (在我的情况下是 16000 B/s,与上面的值一致)。
但关键是,我可以将速率限制在我想要的范围内,同时仍有时间完成其他计算(如从套接字接收、应用声音算法等)

最佳答案

我不能肯定地说,但我认为您要处理的问题是同步 I/O 与异步 I/O。我的猜测是 Pipe 以某种方式以同步方式结束,而 Queue 以异步方式结束。为什么一个是默认一种方式而另一种是另一种方式可能会更好地通过这个问题和答案来回答:

Synchronous/Asynchronous behaviour of python Pipes

关于Python 3.4 多处理队列比 Pipe 快,出乎意料,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26908909/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com