gpt4 book ai didi

python - 使用 Pool().apply_async 进行大型计算期间的多处理死锁

转载 作者:行者123 更新时间:2023-12-02 08:25:51 26 4
gpt4 key购买 nike

我在 Python 3.7.3 中遇到一个问题,在处理大型计算任务时,我的多处理操作(使用队列、池和 apply_async)会死锁。

对于小型计算,这个多处理任务工作得很好。然而,当处理较大的进程时,多处理任务会停止或死锁,完全不会退出进程!我读到,如果您“无限制地增长队列,并且您正在加入一个正在等待队列中的空间的子进程,就会发生这种情况[...]您的主进程已停止等待该进程完成,并且它永不。” (Process.join() and queue don't work with large numbers)

我无法将这个概念转化为代码。我非常感谢有关重构我在下面编写的代码的指导:

import multiprocessing as mp

def listener(q, d): # task to queue information into a manager dictionary
while True:
item_to_write = q.get()
if item_to_write == 'kill':
break
foo = d['region']
foo.add(item_to_write)
d['region'] = foo # add items and set to manager dictionary


def main():
manager = mp.Manager()
q = manager.Queue()
d = manager.dict()
d['region'] = set()

pool = mp.Pool(mp.cpu_count() + 2)
watcher = pool.apply_async(listener, (q, d))
jobs = []
for i in range(24):
job = pool.apply_async(execute_search, (q, d)) # task for multiprocessing
jobs.append(job)
for job in jobs:
job.get() # begin multiprocessing task
q.put('kill') # kill multiprocessing task (view listener function)
pool.close()
pool.join()

print('process complete')


if __name__ == '__main__':
main()

最终,我希望完全防止死锁,以促进可以无限期运行直到完成的多处理任务。

<小时/>

下面是在 BASH 中退出死锁时的回溯

^CTraceback (most recent call last):
File "multithread_search_cl_gamma.py", line 260, in <module>
main(GEOTAG)
File "multithread_search_cl_gamma.py", line 248, in main
job.get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 651, in get
Process ForkPoolWorker-28:
Process ForkPoolWorker-31:
Process ForkPoolWorker-30:
Process ForkPoolWorker-27:
Process ForkPoolWorker-29:
Process ForkPoolWorker-26:
self.wait(timeout)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 648, in wait
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
with self._rlock:
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
self._event.wait(timeout)
File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 552, in wait
Traceback (most recent call last):
Traceback (most recent call last):
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 352, in get
res = self._reader.recv_bytes()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
with self._rlock:
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
signaled = self._cond.wait(timeout)
File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 296, in wait
waiter.acquire()
KeyboardInterrupt
with self._rlock:
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
with self._rlock:
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
with self._rlock:
File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt

以下是更新后的脚本:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
while not stop_event.is_set():
try:
while True:
item_to_write = q.get(False)
if item_to_write == 'kill':
break
foo = d['region']
foo.add(item_to_write)
d['region'] = foo
except queue.Empty:
pass

time.sleep(0.5)
if not q.empty():
continue


def main():
manager = mp.Manager()
stop_event = manager.Event()
q = manager.Queue()
d = manager.dict()
d['region'] = set()
pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
watcher = pool.apply_async(listener, (q, d, stop_event))
stop_event.set()
jobs = []
for i in range(24):
job = pool.apply_async(execute_search, (q, d))
jobs.append(job)
for job in jobs:
job.get()
q.put('kill')
pool.close()
pool.join()
print('process complete')


if __name__ == '__main__':
main()

更新::

execute_command 执行搜索所需的几个进程,因此我在 q.put() 所在的位置输入代码。

仅此一个脚本就需要 72 小时以上才能完成。每个多进程永远不会完成整个任务,而是单独工作并引用 manager.dict() 以避免重复任务。这些任务将一直工作,直到 manager.dict() 中的每个元组都已处理完毕。

def area(self, tup, housing_dict, q):
state, reg, sub_reg = tup[0], tup[1], tup[2]
for cat in housing_dict:
"""
computationally expensive, takes > 72 hours
for a list of 512 tup(s)
"""
result = self.search_geotag(
state, reg, cat, area=sub_reg
)
q.put(tup)

q.put(tup) 最终被放置在 listener 函数中,以将 tup 添加到 manager.dict( )

最佳答案

由于listenerexecute_search共享同一个队列对象,因此可能会出现竞争,其中,execute_searchlistener 之前从队列中获取“kill”,因此 listener 将永远陷入阻塞 get() 状态,因为没有更多新项目。

对于这种情况,您可以使用事件对象来通知所有进程停止:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
while not stop_event.is_set():
try:
item_to_write = q.get(timeout=0.1)
foo = d['region']
foo.add(item_to_write)
d['region'] = foo
except queue.Empty:
pass
print("Listener process stopped")

def main():
manager = mp.Manager()
stop_event = manager.Event()
q = manager.Queue()
d = manager.dict()
d['region'] = set()
pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
watcher = pool.apply_async(listener, (q, d, stop_event))
stop_event.set()
jobs = []
for i in range(24):
job = pool.apply_async(execute_search, (q, d))
jobs.append(job)
try:
for job in jobs:
job.get(300) #get the result or throws a timeout exception after 300 seconds
except multiprocessing.TimeoutError:
pool.terminate()
stop_event.set() # stop listener process
print('process complete')


if __name__ == '__main__':
main()

关于python - 使用 Pool().apply_async 进行大型计算期间的多处理死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58497805/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com