gpt4 book ai didi

python - multiprocessing.Pool 在关闭/加入后无限期挂起

转载 作者:行者123 更新时间:2023-11-28 18:56:07 25 4
gpt4 key购买 nike

我有一个不确定的生产问题,其中 multiprocessing.Pool 卡住并且永远不会从 join 返回。

我已经设法将问题减少到这个小例子,并让它在某种程度上可靠地挂起。

工作示例:

#!/usr/bin/env python3
import os
import time
import multiprocessing.pool

def run_task(i):
print(f'[{os.getpid()}] task({i}) complete')

if __name__ == '__main__':
tasks = iter(range(10))
processes = 4

pool = multiprocessing.pool.Pool(processes=processes, maxtasksperchild=1)
running = []
while True:
try:
running = [ f for f in running if not f.ready() ]
avail = processes - len(running)
if avail:
for _ in range(avail):
i = next(tasks)
print(f'[{os.getpid()}] add task({i})')
future = pool.apply_async(run_task, ( i, ))
running.append(future)
else:
time.sleep(0.1)
except StopIteration:
print(f'[{os.getpid()}] all tasks scheduled')
break

print(f'[{os.getpid()}] close and join pool')
pool.close()
pool.join()
print(f'[{os.getpid()}] all done')

大概是时间问题之一,因为失败是不确定的。因此,我必须循环运行它才能使其挂起(尽管根据我的经验,它会在前几次迭代中的一个上挂起)。

for i in {1..100}; do ./test.py; done   

挂起时的输出:

[15243] add task(0)
[15243] add task(1)
[15243] add task(2)
[15243] add task(3)
[15244] task(0) complete
[15245] task(1) complete
[15246] task(2) complete
[15247] task(3) complete
[15243] add task(4)
[15243] add task(5)
[15251] task(4) complete
[15243] add task(6)
[15243] add task(7)
[15252] task(5) complete
[15253] task(6) complete
[15243] add task(8)
[15243] add task(9)
[15243] all tasks scheduled
[15255] task(8) complete
[15256] task(9) complete
[15243] close and join pool <-- hangs here indefinitely

主进程的gdb回溯:

#0  0x00007fb132b7c6c2 in __GI___waitpid (pid=22857, stat_loc=0x7fff8ef55d5c, options=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
#1 0x00000000005d10e5 in os_waitpid_impl (module=<optimised out>, options=0, pid=22857) at ../Modules/posixmodule.c:6941
#2 os_waitpid.lto_priv () at ../Modules/clinic/posixmodule.c.h:2995
#3 0x000000000050a84f in _PyCFunction_FastCallDict (kwargs=<optimised out>, nargs=<optimised out>, args=<optimised out>, func_obj=0x7fb132fea0d8) at ../Objects/methodobject.c:234
#4 _PyCFunction_FastCallKeywords (kwnames=<optimised out>, nargs=<optimised out>, stack=<optimised out>, func=<optimised out>) at ../Objects/methodobject.c:294
#5 call_function.lto_priv () at ../Python/ceval.c:4851

子进程的 gdb 回溯:

#0  0x00007fb1328896d6 in futex_abstimed_wait_cancelable (private=0, abstime=0x0, expected=0, futex_word=0x1c68e40) at ../sysdeps/unix/sysv/linux/futex-internal.h:205
#1 do_futex_wait (sem=sem@entry=0x1c68e40, abstime=0x0) at sem_waitcommon.c:111
#2 0x00007fb1328897c8 in __new_sem_wait_slow (sem=0x1c68e40, abstime=0x0) at sem_waitcommon.c:181
#3 0x00000000005ab535 in PyThread_acquire_lock_timed (intr_flag=<optimised out>, microseconds=<optimised out>, lock=<optimised out>) at ../Python/thread_pthread.h:386
#4 PyThread_acquire_lock () at ../Python/thread_pthread.h:595
#5 0x0000000000446bf1 in _enter_buffered_busy (self=self@entry=0x7fb13307aa98) at ../Modules/_io/bufferedio.c:292
#6 0x00000000004ce743 in buffered_flush.lto_priv () at ../Python/thread_pthread.h:416

实现说明:

仅在工作人员可用时安排任务:

每个任务的优先级在等待执行时都可能发生变化,所以我不能在一开始就将所有任务排入队列。

因此 running 列表和检查 AsyncResult.ready 以确定我是否可以执行另一个任务

ma​​xtasksperchild=1:

任务泄漏内存,所以为了在每个任务运行后回收丢失的内存,我作弊并使用 maxtasksperchild=1


观察:

sleep vs 忙碌等待:

有趣的是,如果我将 time.sleep(0.1) 更改为忙等待,挂起就会消失。

wait = time.time() + 0.1
while time.time() < wait:
pass

是否有可能在父进程 sleep 期间错过来自子进程的信号?

ma​​xtasksperchild=1:

如果我重用原来的子进程,挂起就会消失。


所以这似乎是进程在每个任务完成后被销毁的事实与父级休眠之间的某种相互作用。

作为生产中的快速修复,我已将 sleep 更改为忙碌的等待,但这感觉就像一个丑陋的 hack,我想了解到底是什么导致了挂起。

  • 为什么池永远不会从 join 返回?
  • 为什么忙着等待而不是 sleep “解决”了这个问题?
  • 为什么每次“解决”问题时重用流程而不是创建新流程?

最佳答案

我认为这个问题是有异常(exception)的,从技术上讲它不应该存在并且可能已经在更高版本的 python 中得到修复。

[15243] add task(4)
[15243] add task(5)
[15251] task(4) complete
[15243] add task(6)
[15243] add task(7)
[15252] task(5) complete
[15253] task(6) complete
[15243] add task(8)
[15243] add task(9)
[15243] all tasks scheduled <-- Exception Called but [15254] or task(7) is not completed
[15255] task(8) complete
[15256] task(9) complete
[15243] close and join pool <-- hangs here indefinitely

在异常调用点发生的事情可能导致 task(7) 进入奇怪的状态,apply_async 允许回调,这意味着 3.6 可能以不稳定的方式创建线程。

阻塞等待意味着你的主进程没有休眠,处理起来可能会更快。检查增加等待时间或使用 apply() 是否有所不同。

我不确定为什么重用“修复”了问题,但可能只是访问时间更快且更容易处理。

关于python - multiprocessing.Pool 在关闭/加入后无限期挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58843576/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com