gpt4 book ai didi

python - multiprocessing.Pipe 比 multiprocessing.Queue 还要慢?

转载 作者:太空狗 更新时间:2023-10-29 20:15:03 27 4
gpt4 key购买 nike

我尝试通过 multiprocessing 包中的 QueuePipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe

奇怪的是,Pipe 在发送大型 numpy 数组时比 Queue 慢。我在这里缺少什么?

管道:

import sys
import time
from multiprocessing import Process, Pipe
import numpy as np

NUM = 1000


def worker(conn):
for task_nbr in range(NUM):
conn.send(np.random.rand(400, 400, 3))
sys.exit(1)


def main():
parent_conn, child_conn = Pipe(duplex=False)
Process(target=worker, args=(child_conn,)).start()
for num in range(NUM):
message = parent_conn.recv()


if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
duration = end_time - start_time
msg_per_sec = NUM / duration

print "Duration: %s" % duration
print "Messages Per Second: %s" % msg_per_sec

# Took 10.86s.

队列

import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
import numpy as np

NUM = 1000

def worker(q):
for task_nbr in range(NUM):
q.put(np.random.rand(400, 400, 3))
sys.exit(1)

def main():
recv_q = Queue()
Process(target=worker, args=(recv_q,)).start()
for num in range(NUM):
message = recv_q.get()

if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
duration = end_time - start_time
msg_per_sec = NUM / duration

print "Duration: %s" % duration
print "Messages Per Second: %s" % msg_per_sec

# Took 6.86s.

最佳答案

您可以做一个实验并将以下内容放入上面的 Pipe 代码中..

def worker(conn):
for task_nbr in range(NUM):
data = np.random.rand(400, 400, 3)
sys.exit(1)

def main():
parent_conn, child_conn = Pipe(duplex=False)
p = Process(target=worker, args=(child_conn,))
p.start()
p.join()

这让您有时间为您的测试创建数据。在我的系统上,这大约需要 2.9 秒。

在底层,queue 对象实现了一个缓冲区和一个线程发送。该线程仍在同一进程中,但通过使用它,数据创建不必等待系统 IO 完成。它有效地并行化了操作。尝试使用一些简单的线程实现修改您的 Pipe 代码(免责声明,此处的代码仅用于测试,尚未准备好生产)。

import sys
import time
import threading
from multiprocessing import Process, Pipe, Lock
import numpy as np
import copy

NUM = 1000

def worker(conn):
_conn = conn
_buf = []
_wlock = Lock()
_sentinel = object() # signal that we're done
def thread_worker():
while 1:
if _buf:
_wlock.acquire()
obj = _buf.pop(0)
if obj is _sentinel: return
_conn.send(data)
_wlock.release()
t = threading.Thread(target=thread_worker)
t.start()
for task_nbr in range(NUM):
data = np.random.rand(400, 400, 3)
data[0][0][0] = task_nbr # just for integrity check
_wlock.acquire()
_buf.append(data)
_wlock.release()
_wlock.acquire()
_buf.append(_sentinel)
_wlock.release()
t.join()
sys.exit(1)

def main():
parent_conn, child_conn = Pipe(duplex=False)
Process(target=worker, args=(child_conn,)).start()
for num in range(NUM):
message = parent_conn.recv()
assert num == message[0][0][0], 'Data was corrupted'

if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
duration = end_time - start_time
msg_per_sec = NUM / duration

print "Duration: %s" % duration
print "Messages Per Second: %s" % msg_per_sec

在我的机器上,这需要 3.4 秒才能运行,这与上面的 Queue 代码几乎完全相同。

来自 https://docs.python.org/2/library/threading.html

在 Cython 中,由于全局解释器锁,一次只能有一个线程执行 Python 代码...但是,如果您想运行多个 I/O 绑定(bind)任务,线程仍然是一种合适的模型同时。

queuepipe 的区别在您深入研究之前绝对是一个奇怪的实现细节。

关于python - multiprocessing.Pipe 比 multiprocessing.Queue 还要慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48353601/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com