gpt4 book ai didi

python - 子进程挂起多处理

转载 作者:太空宇宙 更新时间:2023-11-03 17:30:08 25 4
gpt4 key购买 nike

我遇到一个问题,子进程卡在我的 python 应用程序中,只有 4/16 个进程已完成,所有这些进程都添加到多处理队列中。 https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues根据 python 文档:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

我相信这可能是我的问题,但是我在加入之前从队列中执行了 get() 操作。我不确定我还能采取什么其他选择。

def RunInThread(dictionary):
startedProcesses = list()
resultList = list()
output = Queue()
scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

for item in scriptList:
if __name__ == '__main__':
proc = Process(target=CreateScript, args=(item, output))
startedProcesses.append(proc)
proc.start()

while not output.empty():
resultList.append(output.get())

#we must wait for the processes to finish before continuing
for process in startedProcesses:
process.join()
print "finished"

#defines chunk of data each thread will process
def ThreadChunk(seq, num):
avg = len(seq) / float(num)
out = []
last = 0.0

while last < len(seq):
out.append(seq[int(last):int(last + avg)])
last += avg

return out

def CreateScript(scriptsToGenerate, queue):
start = time.clock()
for script in scriptsToGenerate:
...
queue.put([script['timeInterval'], script['script']])

print time.clock() - start
print "I have finished"

最佳答案

您的代码的问题是 while not output.empty()不可靠(请参阅 empty )。您还可能会遇到这样的情况:在您创建的进程完成初始化之前,解释器会命中而不是output.empty()(从而使队列实际上为空)。

由于您确切知道有多少项目将放入队列中(即 len(dictionnary)),您可以从队列中读取该项目数量:

def RunInThread(dictionary):
startedProcesses = list()
output = Queue()
scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

for item in scriptList:
proc = Process(target=CreateScript, args=(item, output))
startedProcesses.append(proc)
proc.start()

resultList = [output.get() for _ in xrange(len(dictionary))]

#we must wait for the processes to finish before continuing
for process in startedProcesses:
process.join()

print "finished"

如果在某个时候您正在修改脚本并且不再知道将生产多少元素,您可以使用Queue.get具有合理的超时:

def RunInThread(dictionary):
startedProcesses = list()
resultList = list()
output = Queue()
scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

for item in scriptList:
proc = Process(target=CreateScript, args=(item, output))
startedProcesses.append(proc)
proc.start()

try:
while True:
resultList.append(output.get(True, 2)) # block for a 2 seconds timeout, just in case
except queue.Empty:
pass # no more items produced

#we must wait for the processes to finish before continuing
for process in startedProcesses:
process.join()

print "finished"

您可能需要根据 CreateScript 中计算的实际时间来调整超时。

关于python - 子进程挂起多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31986786/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com