gpt4 book ai didi

python - 我可以在 Future 中使用 ProcessPoolExecutor 吗?

转载 作者:太空宇宙 更新时间:2023-11-03 18:33:40 26 4
gpt4 key购买 nike

我有一个接受列表的程序。对于此列表中的每个值,它都会检索另一个列表并处理另一个列表。

基本上,它是一个 3 深度的树,需要在每个节点进行可能昂贵的处理。

每个节点都需要能够处理其子节点的结果。

我想做的是map来自第一层的输入list到每个节点的结果。但在每个过程中,我想 map下一层的结果。

我担心的是每一层都有自己的最大 worker 数量。如果可能的话,我希望他们共享一个进程池,否则所有进程切换都会影响性能。

有没有办法,使用 concurrency.futures或者其他方法,让每一层共享相同的进程池?

一个例子是:

def main():
my_list = [1,2,3,4]
with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
results = executor.map(my_function, zip(my_list, [executor] * len(my_list)))
#process results

def my_function(args):
list = args[0]
executor = args[1]
new_list = process(list)
results = executor.map(second_function, new_list)
#process results
#return processed results

def second_function(values):
...

这样,每个子进程都会从同一个池中提取。

或者,我可以做类似的事情(但不完全是)

import concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor

并每次调用 executor从同一个进程池中提取?

最佳答案

问题是您的进程池有 4 个线程,并且您尝试等待大约 20 个线程..因此没有足够的线程来执行您想要的操作。

换句话说:my_function 在工作线程中执行。当调用 map 时,该线程会阻塞。缺少一个线程可用于执行对映射的调用。 future 阻塞了这个线程。

我的解决方案是使用返回 future 的 yieldyield from 语句。所以我的解决办法就是去掉futures和thread的阻塞。创建所有 future,然后发生yield 来中断当前执行并释放线程。然后该线程可以执行 map future。一旦 future 完成,注册的回调就会执行 next() 生成器步骤。

要解决现有对象的代理问题,必须首先解决此问题:How to properly set up multiprocessing proxy objects for objects that already exist

因此,我们要执行以下递归:[1,[2,[3,3,3],2],1],0,0]列表。

我们可以期待以下输出:

tasks: [[1, [2, [3, 3, 3], 2], 1], 0, 0]
tasks: [1, [2, [3, 3, 3], 2], 1]
tasks: 0
tasks: 0
tasks: 1
tasks: [2, [3, 3, 3], 2]
tasks: 1
tasks: 2
tasks: [3, 3, 3]
tasks: 2
tasks: 3
tasks: 3
tasks: 3
v: 15

这里的代码引入了一个启用递归的ThreadPoolExecutor:

import traceback
from concurrent.futures.thread import *
from concurrent.futures import *
from concurrent.futures._base import *
##import hanging_threads

class RecursiveThreadPoolExecutor(ThreadPoolExecutor):

# updated version here: https://gist.github.com/niccokunzmann/9170072

def _submit(self, fn, *args, **kwargs):
return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Returns:
A Future representing the given call.
"""
real_future = Future()
def generator_start():
try:
## print('start', fn, args, kwargs)
generator = fn(*args, **kwargs)
## print('generator:', generator)
def generator_next():
try:
## print('next')
try:
future = next(generator)
except StopIteration as stop:
real_future.set_result(stop.args[0])
else:
if future is None:
self._submit(generator_next)
else:
future.add_done_callback(lambda future: generator_next())
except:
traceback.print_exc()
self._submit(generator_next)
## print('next submitted 1')
except:
traceback.print_exc()
self._submit(generator_start)
return real_future

def recursive_map(self, fn, *iterables, timeout=None):
"""Returns a iterator equivalent to map(fn, iter).

Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.

Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()

fs = [self.submit(fn, *args) for args in zip(*iterables)]

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
yield from fs
return fs
return result_iterator()

if __name__ == '__main__':

def f(args):
executor, tasks = args
print ('tasks:', tasks)
if type(tasks) == int:
return tasks
# waiting for all futures without blocking the thread
futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks])
return sum([future.result() for future in futures])

with RecursiveThreadPoolExecutor(max_workers = 1) as executor:
r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1)
import time
time.sleep(0.1)

for v in r:
print('v: {}'.format(v))

可以在此处找到更新版本:https://gist.github.com/niccokunzmann/9170072

遗憾的是,我现在无法使用一些多处理内容来为进程实现此功能。您可以做到这一点,唯一需要做的就是为 generator_startgenerator_next 函数创建一个代理对象。如果您这样做,请告诉我。

为了解决方法的代理问题,这个问题也将得到回答:How to properly set up multiprocessing proxy objects for objects that already exist

关于python - 我可以在 Future 中使用 ProcessPoolExecutor 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21960514/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com