gpt4 book ai didi

python - 池.apply_async() : nested function is not executed

转载 作者:太空宇宙 更新时间:2023-11-04 01:57:19 26 4
gpt4 key购买 nike

我越来越熟悉 Python 的 multiprocessing 模块。以下代码按预期工作:

#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
print x
return

pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()

但是,现在,如果我在上面的代码周围包装一个函数,则不会执行 print 语句(或者至少重定向输出):

#outputs nothing
def run():
def run_one(x):
print x
return

pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()

如果我将 run_one 定义移到 run 之外,当我调用 run() 时,输出又是预期的:

#outputs 0 1 2 3
def run_one(x):
print x
return

def run():
pool = Pool(processes=12)
for i in range(4):
pool.apply_async(run_one, (i,))
pool.close()
pool.join()

我在这里错过了什么?为什么第二个片段不打印任何东西?如果我只是调用 run_one(i) 函数而不是使用 apply_async,所有三个代码的输出都相同。

最佳答案

Pool 需要 pickle(序列化)它发送给它的工作进程的所有东西。 Pickling 实际上只保存函数的名称,而 unpickling 需要按名称重新导入函数。为此,函数需要在顶层定义,嵌套函数将不能被子进程导入,并且已经尝试 pickle 它们会引发异常:

from multiprocessing.connection import _ForkingPickler

def run():
def foo(x):
pass
_ForkingPickler.dumps(foo) # multiprocessing custom pickler;
# same effect with pickle.dumps(foo)

run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'

你看不到异常的原因是,因为 Pool 已经开始在父级的 pickling 任务中捕获异常,并且只有在你调用 .get() 时才重新引发它们 在调用 pool.apply_async() 时立即获得的 AsyncResult 对象上。

这就是为什么(使用 Python 2)你最好总是像这样使用它,即使你的目标函数不返回任何东西(仍然返回隐式 None):

    results = [pool.apply_async(foo, (i,)) for i in range(4)]
# `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
for res in results:
res.get()

Pool.map()Pool.starmap() 等非异步池方法在底层使用相同的(异步)低级函数异步同胞,但它们另外为您调用 .get(),因此您总是会看到这些方法的异常。

Python 3 有一个用于异步池方法的 error_callback 参数,您可以使用它来处理异常。

关于python - 池.apply_async() : nested function is not executed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56533827/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com