gpt4 book ai didi

python - 无法立即停止\杀死 multiprocessing.Pool 产生的所有进程

转载 作者:太空狗 更新时间:2023-10-29 21:58:13 29 4
gpt4 key购买 nike

当出现任何错误\异常时,我需要停止\杀死所有进程。我在 StackOwerflow 解决方案中发现使用 psutil 杀死所有进程,但有时我会遇到一个问题 - 当 psutil 杀死子进程和主进程时,新进程可能会启动并编码继续执行。

import psutil

class MyClass:
parent_pid = 0
ids_list = range(300)

def main(self):
self.parent_pid = os.getpid()
pool = multiprocessing.Pool(3)

for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=self.kill_proc_tree)

pool.close()
pool.join()

def kill_proc_tree(self, including_parent=True):
parent = psutil.Process(self.parent_pid)
children = parent.children(recursive=True)

for child in children:
child.kill()
psutil.wait_procs(children, timeout=5)

if including_parent:
parent.kill()
parent.wait(5)

def handle_country_or_region(self, country_id=None, queue=None):
pass
# here I do some task

看来我需要终止池而不是终止进程,但在这种情况下,如果我这样做了

pool.close()
pool.terminate()
pool.join()

我的终端停止做任何事情,新行完全是空的(即没有“>>>”)并且没有任何反应。

理想情况下,我希望有下一个流程:如果有任何错误\异常,停止\终止所有代码执行并返回到终端中的交互式提示。

谁能帮我让它正常工作?我使用 Python 3.5 和 Ubuntu 15.10

最佳答案

解决方案非常简单——将“killer”函数放在“main”中。

完整代码如下:

class MyClass:
ids_list = range(300)

def main(self):
pool = multiprocessing.Pool(3)

def kill_pool(err_msg):
print(err_msg)
pool.terminate()

for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=kill_pool)

pool.close()
pool.join()

def handle_country_or_region(self, country_id=None, queue=None):
pass # here I do some task

如果有人需要使用queue,下面是代码的扩展变体,它展示了如何以正确的方式完成queue,避免出现僵尸进程:

import pickle
import os
import multiprocessing

class MyClass:
ids_list = range(300)
folder = os.path.join(os.getcwd(), 'app_geo')
STOP_TOKEN = 'stop queue'

def main(self):

# >>> Queue part shared between processes <<<
manager = multiprocessing.Manager()
remove_id_queue = manager.Queue()

remove_id_process = multiprocessing.Process(target=self.remove_id_from_file,
args=(remove_id_queue,))
remove_id_process.start()
# >>> End of queue part <<<

pool = multiprocessing.Pool(3)

def kill_pool(err_msg):
print(err_msg)
pool.terminate()

for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=kill_pool)

pool.close()
pool.join()

# >>> Anti-zombie processes queue part <<<
remove_id_queue.put(self.STOP_TOKEN)
remove_id_process.join()
manager.shutdown()
# >>> End

def handle_country_or_region(self, country_id=None, queue=None):
# here I do some task
queue.put(country_id)

def remove_id_from_file(self, some_queue):
while True:
osm_id = some_queue.get()
if osm_id == self.STOP_TOKEN:
return
self.ids_list.remove(osm_id)
with open(self.folder + '/ids_list.pickle', 'wb') as f:
pickle.dump(self.ids_list, f)

关于python - 无法立即停止\杀死 multiprocessing.Pool 产生的所有进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36403855/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com