gpt4 book ai didi

Python多处理进程不会终止

转载 作者:行者123 更新时间:2023-12-01 01:08:43 25 4
gpt4 key购买 nike

当我尝试使用多处理库在 python 中实现并行操作时,我看到一些进程不会以非直观的方式终止。

我的程序包括:

  • 队列,用于进程之间的数据传输
  • 用户进程,使用通过队列接收的数据进行计算
  • 两个maker进程,生成数据并推送到队列

下面是一个简化的示例。 make_data 生成随机数并推送到队列,use_data 接收数据并计算平均值。总共生成了2*1000=2000个数字,并且全部使用。该代码按预期运行。毕竟,所有进程都会终止,队列中不会留下任何数据。

import random
from multiprocessing import Process, Queue

q = Queue(maxsize=10000)
def make_data(q):
for i in range(1000):
x = random.random()
q.put(x)
print("final line of make data")

def use_data(q):
i = 0
res = 0.0
while i < 2000:
if q.empty():
continue
i += 1
x = q.get()
res = res*(i-1)/i + x/i
print("iter %6d, avg = %.5f" % (i, res))

u = Process(target=use_data, args=(q,))
u.start()

p1 = Process(target=make_data, args=(q,))
p1.start()
p2 = Process(target=make_data, args=(q,))
p2.start()


u.join(timeout=10)
p1.join(timeout=10)
p2.join(timeout=10)
print(u.is_alive(), p1.is_alive(), p2.is_alive(), q.qsize())

结果:

final line of make data
final line of make data
iter 2000, avg = 0.49655
False False False 0

当我让制作者生成超出必要的数据时,事情就会发生变化。下面的代码与上面的不同之处仅在于每个制造商生成 5000 个数据,因此并未使用所有数据。运行时,它会打印最后几行的消息,但制造商进程永远不会终止(需要 Ctrl-C 才能停止)。

import random
from multiprocessing import Process, Queue

q = Queue(maxsize=10000)
def make_data(q):
for i in range(5000):
x = random.random()
q.put(x)
print("final line of make data")

def use_data(q):
i = 0
res = 0.0
while i < 2000:
if q.empty():
continue
i += 1
x = q.get()
res = res*(i-1)/i + x/i
print("iter %6d, avg = %.5f" % (i, res))

u = Process(target=use_data, args=(q,))
u.start()

p1 = Process(target=make_data, args=(q,))
p1.start()
p2 = Process(target=make_data, args=(q,))
p2.start()


u.join(timeout=10)
p1.join(timeout=10)
p2.join(timeout=10)
print(u.is_alive(), p1.is_alive(), p2.is_alive(), q.qsize())

结果:

final line of make data
final line of make data
iter 2000, avg = 0.49388
False True True 8000
# and never finish

在我看来,所有进程都会运行到最后,所以想知道为什么它们保持事件状态。有人可以帮助我理解这种现象吗?

我在 miniconda 发行版的 python 3.6.6 上运行了这个程序。

最佳答案

将项目放入队列的子进程在尝试实际将对象放入队列时被卡住。

普通的非多处理Queue对象完全在单个进程的地址空间中实现。在这种情况下,maxsize 是在 put() 调用阻塞之前可以排队的项目数。但是多处理Queue对象是使用IPC机制实现的;通常是管道。操作系统管道可以对有限数量的字节进行排队(典型限制为 8KB)。因此,当您的 use_data() 进程在仅出队 2000 个项目后终止时,make_data() 进程会阻塞,因为在退出时将本地排队的项目刷新到 IPC 时它们的 IPC channel 已满。这意味着它们实际上并没有退出,因此 join() 这些进程的尝试会无限期地阻塞。

实际上,您已经造成了僵局。发生这种情况的确切阈值取决于 IPC channel 可以缓冲的数据量。例如,在我的一台 Linux 服务器上,您的第二个示例可以可靠地工作,并将其插入到 u.join()p1.join() 之间:

for _ in range(4000):
q.get()

稍微减小该范围(例如,减小到 3990)会产生间歇性挂起。进一步减少范围(例如,减少到 3500)将始终挂起,因为至少有一个进程将数据填充到队列 block 中,同时将其项目刷新到 IPC channel 中。

这个故事的教训是什么?在尝试等待进程终止之前,始终完全耗尽多处理队列。

关于Python多处理进程不会终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55080541/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com