gpt4 book ai didi

python - Python 多处理中提前终止时的死锁

转载 作者:太空宇宙 更新时间:2023-11-03 15:15:51 30 4
gpt4 key购买 nike

我正在 Python 中创建一个 multiprocessing.Queue 并将 multiprocessing.Process 实例添加到此 Queue

我想添加一个在每个作业之后执行的函数调用,它检查特定任务是否成功。如果是这样,我想清空队列并终止执行。

我的Process类是:

class Worker(multiprocessing.Process):

def __init__(self, queue, check_success=None, directory=None, permit_nonzero=False):
super(Worker, self).__init__()
self.check_success = check_success
self.directory = directory
self.permit_nonzero = permit_nonzero
self.queue = queue

def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
# Terminate all remaining jobs here
pass

我的队列设置在这里:

class LocalJobServer(object):

@staticmethod
def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False, time=None, *args, **kwargs):
if check_success and not callable(check_success):
msg = "check_success option requires a callable function/object: {0}".format(check_success)
raise ValueError(msg)

# Create a new queue
queue = multiprocessing.Queue()
# Create workers equivalent to the number of jobs
workers = []
for _ in range(nproc):
wp = Worker(queue, check_success=check_success, directory=directory, permit_nonzero=permit_nonzero)
wp.start()
workers.append(wp)
# Add each command to the queue
for cmd in command:
queue.put(cmd, timeout=time)
# Stop workers from exiting without completion
for _ in range(nproc):
queue.put(None)
for wp in workers:
wp.join()

函数调用mbkit.dispatch.cexectools.cexec()subprocess.Popen的包装器,并返回p.stdout

Worker 类中,我编写了条件来检查作业是否成功,并尝试使用 while 清空 Queue 中的剩余作业 循环,即我的 Worker.run() 函数如下所示:

def run(self):
for job in iter(self.queue.get, None):
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
break
while not self.queue.empty():
self.queue.get()

虽然这有时有效,但通常会死锁,我唯一的选择是Ctrl-C。我知道 .empty() 不可靠,因此我的问题。

关于如何实现这种提前终止功能有什么建议吗?

最佳答案

这里没有陷入僵局。它仅与 multiprocessing.Queue 的行为相关,因为 get 方法默认是阻塞的。因此,当您在空队列上调用 get 时,调用会停止,等待下一个元素准备好。您可以看到,您的一些工作线程将会停止,因为当您使用循环 while not self.queue.empty() 来清空它时,您会删除所有 None 哨兵,并且您的一些工作人员将阻塞在空的 Queue 上,如以下代码所示:

from multiprocessing import Queue
q = Queue()
for e in iter(q.get, None):
print(e)

要在队列为空时收到通知,您需要使用非阻塞调用。例如,您可以使用 q.get_nowait,或在 q.get(timeout=1) 中使用超时。当队列为空时,两者都会抛出 multiprocessing.queues.Empty 异常。因此,您应该将 Worker for job in iter(...): 循环替换为以下内容:

while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
# Do stuff with your job

如果您不想在任何时候陷入困境。

对于同步部分,我建议使用同步原语,例如 multiprocessing.Conditionmultiprocessing.Event 。这比值(value)更清晰,因为它们是为此目的而设计的。像这样的事情应该有帮助

def run(self):
while not queue.empty():
try:
job = queue.get(timeout=.1)
except multiprocessing.queues.Empty:
continue
if self.event.is_set():
continue
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
self.event.set()
print("Worker {} terminated cleanly".format(self.name))

event = multiprocessing.Event()

请注意,也可以使用multiprocessing.Pool来避免处理队列和工作人员。但由于您需要一些同步原语,因此设置可能会稍微复杂一些。像这样的东西应该有效:

 def worker(job, success, check_success=None, directory=None, permit_nonzero=False):
if sucess.is_set():
return False
stdout = mbkit.dispatch.cexectools.cexec([job], directory=self.directory, permit_nonzero=self.permit_nonzero)
with open(job.rsplit('.', 1)[0] + '.log', 'w') as f_out:
f_out.write(stdout)
if callable(self.check_success) and self.check_success(job):
success.set()
return True

# ......
# In the class LocalJobServer
# .....

def sub(command, check_success=None, directory=None, nproc=1, permit_nonzero=False):

mgr = multiprocessing.Manager()
success = mgr.Event()

pool = multiprocessing.Pool(nproc)
run_args = [(cmd, success, check_success, directory, permit_nonzero)]
result = pool.starmap(worker, run_args)

pool.close()
pool.join()

请注意,我使用管理器,因为您无法直接将 multiprocessing.Event 作为参数传递。您还可以使用 Pool 的参数 initializerinitargs 在每个工作线程中启动全局 success 事件并避免依赖于Manager,但稍微复杂一些。

关于python - Python 多处理中提前终止时的死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43900219/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com