gpt4 book ai didi

Python: concurrent.futures 如何让它可取消?

转载 作者:太空狗 更新时间:2023-10-29 18:28:37 56 4
gpt4 key购买 nike

Python concurrent.futures 和 ProcessPoolExecutor 提供了一个简洁的界面来安排和监控任务。 future 连provide .cancel() 方法:

cancel(): Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

不幸的是在一个类似的question (关于 asyncio)答案声称使用这个文档片段无法取消正在运行的任务,但文档并没有这么说,只有当它们正在运行并且不可取消时。

向进程提交 multiprocessing.Events 也并非易事(通过 multiprocess.Process 中的参数这样做会返回 RuntimeError)

我想做什么?我想划分一个搜索空间并为每个分区运行一个任务。但是有一个解决方案就足够了,而且这个过程是 CPU 密集型的。那么,是否有一种真正舒适的方法可以实现这一目标,并且不会抵消一开始使用 ProcessPool 带来的 yield ?

例子:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, return_when=FIRST_COMPLETED)
for d in done:
print(d.result())

print("---")
for d in not_done:
# will return false for Cancel and Result for all futures
print("Cancel: "+str(d.cancel()))
print("Result: "+str(d.result()))

最佳答案

我不知道为什么 concurrent.futures.Future 没有 .kill() 方法,但是你可以通过关闭进程来完成你想要的池与 pool.shutdown(wait=False),并手动杀死剩余的子进程。

创建一个杀死子进程的函数:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
try:
parent = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = parent.children(recursive=True)
for process in children:
process.send_signal(sig)

运行你的代码直到你得到第一个结果,然后杀死所有剩余的子进程:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())

关于Python: concurrent.futures 如何让它可取消?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42782953/

56 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com