gpt4 book ai didi

python - 一旦其中一个 worker 满足特定条件,就终止 Python 多处理程序

转载 作者:太空狗 更新时间:2023-10-29 20:39:27 25 4
gpt4 key购买 nike

我正在使用其多处理模块编写 Python 程序。该程序调用许多辅助函数,每个辅助函数产生一个随机数。 一旦其中一名 worker 产生了大于 0.7 的数字,我就需要终止程序

下面是我的程序,其中“如何做”部分尚未填写。任何想法?谢谢。

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
np.random.seed(int(time.time()+i))

time.sleep(3)
res=np.random.rand()
print "From i = ",i, " res = ",res
if res>0.7:
print "find it"
# terminate ???? Question: How to do this???


if __name__=='__main__':
num_workers=mp.cpu_count()
pool=mp.Pool(num_workers)
for i in range(num_workers):
p=mp.Process(target=f,args=(i,))
p.start()

最佳答案

没有进程可以阻止另一次暴力os.kill()-like sledgehammers。不要去那里。

要理智地做到这一点,您需要重新设计您的基本方法:主进程和工作进程需要相互通信。

我会充实它,但到目前为止的示例简单到无法使它有用。例如,如所写,对 rand() 的调用不会超过 num_workers 次,因此没有理由相信它们中的任何一个必须大于 0.7。

一旦worker函数长了一个循环,那么它就变得更加明显了。例如,worker 可以检查循环顶部是否设置了 mp.Event,如果是则退出。主进程会在它希望工作人员停止时设置 Event

并且工作人员可以在发现值 > 0.7 时设置不同的 mp.Event。主进程会等待那个Event,然后设置“time to stop”Event让工作人员看到,然后执行通常的循环.join()- 让工作人员干净地关闭。

编辑

这是一个可移植的、干净的解决方案,假设工作人员将继续工作,直到至少有人找到 > 0.7 的值。请注意,我从中删除了 numpy,因为它与这段代码无关。此处的代码在任何支持 multiprocessing 的平台上的任何现有 Python 下都应该可以正常工作:

import random
from time import sleep

def worker(i, quit, foundit):
print "%d started" % i
while not quit.is_set():
x = random.random()
if x > 0.7:
print '%d found %g' % (i, x)
foundit.set()
break
sleep(0.1)
print "%d is done" % i

if __name__ == "__main__":
import multiprocessing as mp
quit = mp.Event()
foundit = mp.Event()
for i in range(mp.cpu_count()):
p = mp.Process(target=worker, args=(i, quit, foundit))
p.start()
foundit.wait()
quit.set()

和一些示例输出:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

一切都干净地关闭:没有回溯,没有异常终止,没有遗留僵尸进程......干净得像哨子。

杀死它

正如@noxdafox 所指出的,有一个 Pool.terminate() 方法可以跨平台尽其所能杀死工作进程,无论他们在做什么(例如,在 Windows 上)它调用平台 TerminateProcess())。我不建议将它用于生产代码,因为突然终止进程会使各种共享资源处于不一致状态,或者让它们泄漏。 multiprocessing 文档中有各种关于此的警告,您应该向其中添加您的操作系统文档。

不过,这还是很方便的!这是使用这种方法的完整程序。请注意,我将截止值提高到 0.95,以使运行时间比眨眼时间更长:

import random
from time import sleep

def worker(i):
print "%d started" % i
while True:
x = random.random()
print '%d found %g' % (i, x)
if x > 0.95:
return x # triggers callback
sleep(0.5)

# callback running only in __main__
def quit(arg):
print "quitting with %g" % arg
# note: p is visible because it's global in __main__
p.terminate() # kill all pool workers

if __name__ == "__main__":
import multiprocessing as mp
ncpu = mp.cpu_count()
p = mp.Pool(ncpu)
for i in range(ncpu):
p.apply_async(worker, args=(i,), callback=quit)
p.close()
p.join()

和一些示例输出:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

关于python - 一旦其中一个 worker 满足特定条件,就终止 Python 多处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36962462/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com