gpt4 book ai didi

python-3.x - python multiprocessing.Queue 没有处理所有的值

转载 作者:行者123 更新时间:2023-12-04 12:53:07 28 4
gpt4 key购买 nike

我有一些 multiprocessing.Queues 列表可以在两个进程之间进行通信。我想发送一个“无”作为每个队列上的最后一个值,以向第二个进程指示数据流的结束,但这似乎并不总是有效(我在某些队列中得到了 None 但不是在每一个中),除非我在 put() 指令之一之后添加至少一个 print() 。
澄清:它有时在没有打印的情况下也能工作,但并非总是如此。此外,当我放置打印说明时,到目前为止 100% 的时间都可以。
我还尝试为 put() 方法设置 block=True ,但这似乎没有任何区别。
我发现这个解决方案试图调试问题,找出在将值放入队列或获取它们时是否遇到问题,但是当我在 put() 端放置一个 print() 时,代码总是作品。
编辑:
一个简化但完整的版本,部分重现了问题:我已经确定了两个可能有问题的部分,在代码中标记为 CODEBLOCK1 和 CODEBLOCK2:如果我取消注释其中任何一个,代码会按预期工作。
最小示例.py:

import multiprocessing, processes


def MainProcess():

multiprocessing.set_start_method("spawn")
metricsQueue = multiprocessing.Queue() # Virtually infinite size

# Define and start the parallel processes
process1 = multiprocessing.Process(target=processes.Process1,
args=(metricsQueue,))

process2 = multiprocessing.Process(target=processes.Process2,
args=(metricsQueue,))

process1.start()
process2.start()

process1.join()
process2.join()


# Script entry point
if __name__ == '__main__':

MainProcess()
进程.py:
import random, queue

def Process1(metricsQueue):

print("Start of process 1")

# Cancel join for the queues, so that upon killing this process, the main process does not block on join if there
# are still elements on the queues -> We don't mind losing data if the process is killed.
# Start of CODEBLOCK1
metricsQueue.cancel_join_thread()
# End of CODEBLOCK1

longData = random.sample(range(10205, 26512), 992)

# Start of CODEBLOCK2
# Put a big number of data in the queue
for data in longData:
try:
metricsQueue.put(data, block=False)

except queue.Full:
print("Error")
# End of CODEBLOCK2


# Once finished, push a None through all queues to mark the end of the process
try:
metricsQueue.put(None, block=False)
print("put None in metricsQueue")

except queue.Full:
print("Error")

print("End of process 1")



def Process2(metricsQueue):

print("Start of process 2")

newMetricsPoint = 0
recoveredMetrics = []

while (newMetricsPoint is not None):

# Metrics point
try:
newMetricsPoint = metricsQueue.get(block=False)

except queue.Empty:
pass

else:
if (newMetricsPoint is not None):
recoveredMetrics.append(newMetricsPoint)
print(f"got {len(recoveredMetrics)} points so far")

else:
print("get None from metricsQueue")

print("End of process 2")
这段代码给出了这样的结果,第二个过程永远不会结束,因为卡在了 wile 循环中:
Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
如果我评论 CODEBLOCK1 或 CODEBLOCK2,代码将按预期工作:
Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
get None from metricsQueue 0
End of process 2

最佳答案

We don't mind losing data if the process is killed.


这个假设是不正确的。收盘信号 None是数据的一部分;丢失它会阻止同级进程关闭。
如果进程依赖关闭信号,则不要 .cancel_join_thread()对于用于发送此信号的队列。

关于python-3.x - python multiprocessing.Queue 没有处理所有的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69574959/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com