gpt4 book ai didi

python - 为什么这里需要 Queue.join() ?

转载 作者:太空宇宙 更新时间:2023-11-03 10:55:03 24 4
gpt4 key购买 nike

我正在学习python的threading模块,写了下面的代码帮助自己理解

from Queue import Queue
import threading

lock = threading.Lock()
MAX_THREADS = 8
q = Queue()
count = 0

# some i/o process
def io_process(x):
pass

# process that deals with shared resources
def shared_resource_process(x):
pass

def func():
global q, count
while not q.empty():
x = q.get()
io_process(x)
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
lock.release()

def main():
global q
for i in range(40):
q.put(i)

threads = []
for i in range(MAX_THREADS):
threads.append(threading.Thread(target=func))

for t in threads:
t.start()

for t in threads:
t.join()

print 'multi-thread done.'
print count == 40

if __name__ == '__main__':
main()

输出像这样卡住了:

Thread-1 is processing 32
Thread-8 is processing 33
Thread-6 is processing 34
Thread-2 is processing 35
Thread-5 is processing 36
Thread-3 is processing 37
Thread-7 is processing 38
Thread-4 is processing 39

请注意 main() 中的打印未执行,这意味着某些线程挂起/阻塞?

然后我通过添加 q.task_done() 修改 func() 方法:

if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
q.task_done() # why is this necessary ?
lock.release()

现在所有线程都如我所料终止并得到正确的输出:

Thread-6 is processing 36
Thread-4 is processing 37
Thread-3 is processing 38
Thread-7 is processing 39
multi-thread done.
True

Process finished with exit code 0

我阅读了 Queue.Queue 的文档 here并查看 task_done() 与 queue.join() 一起使用以确保处理队列中的所有项目。但是因为我没有在 main() 中调用 queue.join(),为什么 task_done() 在 func() 中是必需的?当我错过 task_done() 代码时,线程挂起/阻塞的原因是什么?

最佳答案

您的代码中存在竞争条件。想象一下,您在 Queue 中只剩下一个项目,并且您将只使用两个线程而不是 8 个。然后会发生以下事件序列:

  1. 线程 A 调用 q.empty检查它是否为空。由于队列中有一项结果为False,因此执行了循环体。
  2. 在线程 A 调用 q.get 之前有一个上下文切换,线程 B 开始运行。
  3. 线程B调用q.empty,队列中还有一项,结果为False,执行循环体。
  4. 线程 B 调用 q.get没有参数,它会立即返回队列中的最后一项。然后线程 B 处理项目并退出,因为 q.empty 返回 True
  5. 线程 A 开始运行。因为它已经在第 1 步中调用了 q.empty,所以接下来会调用 q.get,但这将永远阻塞,因此您的程序不会终止。

您可以通过导入 time 并稍微更改循环来模拟上述行为:

while not q.empty():
time.sleep(0.1) # Force context switch
x = q.get()

请注意,无论是否调用 task_done,行为都是相同的。

那么为什么添加 task_done 有帮助?默认情况下,Python 2 将每 100 条解释器指令进行一次上下文切换,因此添加代码可能会改变上下文切换发生的位置。参见 another questionlinked PDF为了更好的解释。在我的机器上,无论 task_done 是否存在,程序都不会挂起,所以这只是一个推测,是什么导致了你发生这种情况。

如果你想修复这个行为,你可以有无限循环并将​​参数传递给 get 指示它不要阻塞。这会导致 get 最终抛出 Queue.Empty 异常,您可以捕获该异常然后中断循环:

from Queue import Queue, Empty

def func():
global q, count
while True:
try:
x = q.get(False)
except Empty:
break
io_process(x)
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
lock.release()

关于python - 为什么这里需要 Queue.join() ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42558797/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com