gpt4 book ai didi

Python 多处理管道 "Deadlock"

转载 作者:行者123 更新时间:2023-11-28 18:45:58 27 4
gpt4 key购买 nike

我在使用以下示例代码时遇到问题:

from multiprocessing import Lock, Process, Queue, current_process

def worker(work_queue, done_queue):
for item in iter(work_queue.get, 'STOP'):
print("adding ", item, "to done queue")
#this works: done_queue.put(item*10)
done_queue.put(item*1000) #this doesnt!
return True

def main():
workers = 4
work_queue = Queue()
done_queue = Queue()
processes = []

for x in range(10):
work_queue.put("hi"+str(x))

for w in range(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')

for p in processes:
p.join()

done_queue.put('STOP')

for item in iter(done_queue.get, 'STOP'):
print(item)


if __name__ == '__main__':
main()

当 done Queue 变得足够大时(我认为限制大约为 64k),整个事情会卡住,恕不另行通知。

当队列变得太大时,一般的处理方法是什么?有没有什么方法可以在处理完元素后即时删除元素? The Python docs recommend removing the p.join() ,但是在实际应用程序中我无法估计进程何时完成。除了无限循环和使用 .get_nowait() 之外,是否有针对此问题的简单解决方案?

最佳答案

这适用于 3.4.0alpha4、3.3、3.2、3.1 和 2.6。它追溯 2.7 和 3.0。我 pylint'd 了,顺便说一句。

#!/usr/local/cpython-3.3/bin/python

'''SSCCE for a queue deadlock'''

import sys
import multiprocessing

def worker(workerno, work_queue, done_queue):
'''Worker function'''
#reps = 10 # this worked for the OP
#reps = 1000 # this worked for me
reps = 10000 # this didn't

for item in iter(work_queue.get, 'STOP'):
print("adding", item, "to done queue")
#this works: done_queue.put(item*10)
for thing in item * reps:
#print('workerno: {}, adding thing {}'.format(workerno, thing))
done_queue.put(thing)
done_queue.put('STOP')
print('workerno: {0}, exited loop'.format(workerno))
return True

def main():
'''main function'''
workers = 4
work_queue = multiprocessing.Queue(maxsize=0)
done_queue = multiprocessing.Queue(maxsize=0)
processes = []

for integer in range(10):
work_queue.put("hi"+str(integer))

for workerno in range(workers):
dummy = workerno
process = multiprocessing.Process(target=worker, args=(workerno, work_queue, done_queue))
process.start()
processes.append(process)
work_queue.put('STOP')

itemno = 0
stops = 0
while True:
item = done_queue.get()
itemno += 1
sys.stdout.write('itemno {0}\r'.format(itemno))
if item == 'STOP':
stops += 1
if stops == workers:
break
print('exited done_queue empty loop')


for workerno, process in enumerate(processes):
print('attempting process.join() of workerno {0}'.format(workerno))
process.join()

done_queue.put('STOP')

if __name__ == '__main__':
main()

HTH

关于Python 多处理管道 "Deadlock",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20167735/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com