0: self.queue.get(timeout=3) “读者”在-6ren">
gpt4 book ai didi

python - multiprocessing.Queue deadlocks after "reader"进程死亡

转载 作者:太空狗 更新时间:2023-10-30 01:29:12 28 4
gpt4 key购买 nike

我一直在玩 multiprocessing 包并注意到在以下情况下队列可能会因读取而死锁:

  1. “阅读器”进程正在使用 get 超时> 0:

    self.queue.get(timeout=3)
  2. “读者”在 get 时死亡由于超时而阻塞。

在该队列被永久锁定之后。

演示问题的应用

我创建了两个子进程“Worker”(放入队列)和“Receiver”(从队列中取出)。父进程也定期检查他的 child 是否are alive如果需要,开始新的 child 。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import procname
import time

class Receiver(multiprocessing.Process):
''' Reads from queue with 3 secs timeout '''

def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue

def run(self):
procname.setprocname('Receiver')
while True:
try:
msg = self.queue.get(timeout=3)
print '<<< `{}`, queue rlock: {}'.format(
msg, self.queue._rlock)
except multiprocessing.queues.Empty:
print '<<< EMPTY, Queue rlock: {}'.format(
self.queue._rlock)
pass


class Worker(multiprocessing.Process):
''' Puts into queue with 1 sec sleep '''

def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue

def run(self):
procname.setprocname('Worker')
while True:
time.sleep(1)
print 'Worker: putting msg, Queue size: ~{}'.format(
self.queue.qsize())
self.queue.put('msg from Worker')


if __name__ == '__main__':
queue = multiprocessing.Queue()

worker = Worker(queue)
worker.start()

receiver = Receiver(queue)
receiver.start()

while True:
time.sleep(1)
if not worker.is_alive():
print 'Restarting worker'
worker = Worker(queue)
worker.start()
if not receiver.is_alive():
print 'Restarting receiver'
receiver = Receiver(queue)
receiver.start()

进程树在 ps 中的样子

bash
\_ python queuetest.py
\_ Worker
\_ Receiver

控制台输出

$ python queuetest.py
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Restarting receiver <-- killed Receiver with SIGTERM
Worker: putting msg, Queue size: ~0
Worker: putting msg, Queue size: ~1
Worker: putting msg, Queue size: ~2
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~3
Worker: putting msg, Queue size: ~4
Worker: putting msg, Queue size: ~5
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~6
Worker: putting msg, Queue size: ~7

有什么办法可以绕过这个吗?使用get_nowait与 sleep 相结合似乎是某种解决方法,但它不会“按原样”读取数据。

系统信息

$ uname -sr
Linux 3.11.8-200.fc19.x86_64

$ python -V
Python 2.7.5

In [3]: multiprocessing.__version__
Out[3]: '0.70a1'

“一切正常”的解决方案

在写这个问题时,我对 Receiver 类进行了一些愚蠢的修改:

class Receiver(multiprocessing.Process):

def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue

def run(self):
procname.setprocname('Receiver')
while True:
time.sleep(1)
while True:
try:
msg = self.queue.get_nowait()
print '<<< `{}`, queue rlock: {}'.format(
msg, self.queue._rlock)
except multiprocessing.queues.Empty:
print '<<< EMPTY, Queue rlock: {}'.format(
self.queue._rlock)
break

但是我觉得不太好。

最佳答案

这可能是因为 Queue.get() 中的 *not_empty.release()* 从未发生过(进程已经被杀死)。您是否尝试在 Receiver 中捕获 TERM 信号并在退出前释放 Queue mutex?

关于python - multiprocessing.Queue deadlocks after "reader"进程死亡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21349850/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com