gpt4 book ai didi

python - 检索 pool.apply_async() 结果时出现类型错误

转载 作者:行者123 更新时间:2023-12-01 05:04:35 24 4
gpt4 key购买 nike

问题

我遇到了一个异常,我无法确定其原因,希望能帮助解决它。

背景

我有一个 Python 2.7.6 工作池,用于异步启动多个函数,在关闭并加入该池后,我检查 ApplyResult 对象以确保所有函数都成功。当我尝试检索结果时,出现以下错误:

Traceback (most recent call last):
  File "parse.py", line 798, in
    main()
  File "parse.py", line 769, in main
    produce_output_files(args.output_dir)
  File "parse.py", line 524, in produce_output_files
    print(result.get())
  File "/user/Python-2.7.6/lib/python2.7/multiprocessing/pool.py", line 554, in get
    raise self._value
TypeError: foo1() argument after ** must be a mapping, not AcquirerProxy

这是我启动子进程的代码:

def produce_output_files(output_dir):

pool = multiprocessing.Pool()
manager = multiprocessing.Manager()
db_lock = manager.Lock()
results = [pool.apply_async(func, output_dir, db_lock)
for func in [foo1, foo2, foo3]]

pool.close()
pool.join()

for result in results:
if not result.successful():
print(result.get())

return

我的所有目标函数都具有以下结构:

def foo1(output_dir, db_lock):
try:

# wrapping the whole function in a try/except block because tracebacks
# aren't pickleable, but they can be packaged into a string for pickling

except:
raise Exception("".join(traceback.format_exception(*sys.exc_info())))

采取的调试步骤

这是 worker 异常吗?

最初,我以为我只是从工作人员那里得到回溯,因为 docs for AsyncResult陈述以下内容:

If the remote call raised an exception then that exception will be reraised by get().

...我将回溯打包到单个字符串中的方式应该会导致在主进程中打印正确的回溯。为了测试这一点,我将调用的函数更改为:

def _produce_C(output_dir, db_lock):
raise Exception("test")

这个测试导致了相同的回溯,所以我知道我没有打印工作进程的异常(“测试”从未被打印)。我相信异常是由我检索结果的方式引起的,而不仅仅是子进程中异常的传播。

结果尚未准备好?

我还知道,当我对结果对象调用 get() 时,结果已准备就绪,因为我已经关闭并加入了池。为了确定这一点,我将 for 循环更改为以下内容:

    for result in results:
result.wait()
if not result.successful():
print(result.get())

这也导致了相同的回溯。

工作人员关闭且结果已过期?

我最后一次修复错误的尝试是切换池加入和检索结果的顺序,如下所示:

    for result in results:
result.wait()
if not result.successful():
print(result.get())

pool.close()
pool.join()

再次产生相同的回溯。

其他信息

this Python issue report 中所述,get() 方法通常不会生成完整的回溯,因为无法对回溯进行 pickle。然而,在上面显示的第一个调试测试中,如果 get() 实际上捕获了来自工作线程的异常,我应该仍然在回溯中看到字符串“test”。另外,我链接到的问题报告中特别提到了我将函数包装在 try/ except block 中以捕获回溯的方法作为解决方法。

最佳答案

您必须将参数传递给您使用 apply_async 调用的函数 in a tuple :

results = [pool.apply_async(func, (output_dir, db_lock)) 
for func in [foo1, foo2, foo3]]

这将修复异常。考虑一下这是 apply_async 的定义:

def apply_async(self, func, args=(), kwds={}, callback=None):

按照您当前的方式传递参数,您实际上是在这样做:

pool.apply_async(func, args=output_dir, kwargs=db_lock)

这解释了回溯:

TypeError: foo1() argument after ** must be a mapping, not AcquirerProxy.

它试图像对待kwargs一样对待db_lock。绝对不是你想要的!

关于python - 检索 pool.apply_async() 结果时出现类型错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25314394/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com